<a href="https://colab.research.google.com/github/CristianPachacama/political_parties_thw/blob/main/data_science_political_parties.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instructions

## Survey Analysis

As a data scientist you are required to analyse the political landscape of Europe using the **Chapel Hill Expert Survery** dataset. The dataset provides insights into the positioning of **277 political parties** in Europe based on **55 different attributes**. The dataset can be downloaded [here](https://www.chesdata.eu/2019-chapel-hill-expert-survey) and the [codebook](https://static1.squarespace.com/static/5975c9bfdb29d6a05c65209b/t/5fa04ec05d3c8218b7c91450/1604341440585/2019_CHES_codebook.pdf) provides further information on the survey attributes.

This repository contains the necessary setup and code base to help guide you in performing an analysis using different statistical methods.

## Project Setup

### Pre-requisites

Please make sure you have a Google account in order to use the [Google Colaboratory](https://colab.research.google.com/).

### Install all python dependencies

Google Colaboratory by default already has some of the most commom dependencies used by data scientists. In case you need install some additional dependencies you can do it by runing in an empty cell something like

```bash
!pip install <python-package-name>
```

## Gearing Up for the Pairing Session

Please be sure to complete the below tasks before the pairing session.

1. Get a high-level understanding of the dataset by looking into the [codebook](https://static1.squarespace.com/static/5975c9bfdb29d6a05c65209b/t/5fa04ec05d3c8218b7c91450/1604341440585/2019_CHES_codebook.pdf) and if necessary downloading the dataset.
2. Have your coding environment ready by installing python and poetry.
3. Ensure that you are able to run all commands mentioned in this README (except for pytest errors)

**Please note that you DO NOT have to complete the code/tasks in this notebook. It is meant to be done together during pairing session.**

---

# Import modules

In [None]:
from typing import List, Optional

import os
from pathlib import Path
from itertools import cycle
from urllib.request import urlretrieve
import logging

import sqlite3
import numpy as np
import pandas as pd
import matplotlib as mpl
from matplotlib import pyplot

from sklearn import decomposition, mixture

In [None]:
logger = logging.getLogger('my_logger')
logging.basicConfig(level=logging.DEBUG)

# 0. Data loading
Here we implemented two classes: `DataExtractor` and `EuropeanParties`.

The `DataExtractor` class downloads the data, whilst the `EuropeanParties` class was implemented to manipulate the raw table, named as `raw_table` in the SQLite DB.

After loading the data on the DataBase, you are required perform some descritive statistics using the `raw data`.

In [None]:
class DataExtractor:
    """A class to extract data from internet"""
    data_url: str = "https://cadmus.eui.eu/bitstream/handle/1814/69975/CHES_TREND_1999-2019.dta?sequence=6&isAllowed=y"

    def __init__(self):
        self.party_data = self._download_data()
        self.non_features = []
        self.index = ["party_id", "party", "country"]

    def _download_data(self) -> pd.DataFrame:

        logging.debug(f"Extracting data from {self.data_url}")
        Path("data").mkdir(exist_ok=True)
        data_path, _ = urlretrieve(
          self.data_url,
          Path("data").joinpath(*["CHES2019V3.dta"]),
        )
        raw_data =  pd.read_stata(data_path)
        clean_raw_data = self._clean_data(raw_data)
        for c in list(clean_raw_data.dtypes[clean_raw_data.dtypes == 'category'].index):
            clean_raw_data[c] = clean_raw_data[c].astype(str)
        logging.debug("Data extracted...")

        return clean_raw_data

    def _clean_data(self, df: pd.DataFrame):
        df = df.copy()
        mapper = {"eastwest_map": {
              'east': 1,
              'west': 0
            },
            "eumember_map": {
                'other': 0,
                'EU member state': 1
            },
            "eu_position_map": {
              "strongly opposed": 1,
              "opposed": 2,
              "somewhat opposed": 3,
              "neutral": 4,
              "somewhat in favor": 5,
              "in favor": 6,
              "strongly in favor": 7
            },
            "eu_intmark_map": {
                "neutral toward expanding EU powers on the internal market": 3,
                "stongly favors explanding EU powers on the internal market": 7,
                "stongly opposes expanding EU powers on the internal market": 1
                },
            "eu_cohesion_map": {
                "neutral towards the EU's cohesion policy": 3,
                "strongly favors the EU's cohesion policy": 7,
                "strongly opposes the EU's cohesion policy": 1
                },
            "eu_asylum_map": {
                "neutral towards a common policy on political asylum": 3,
                "strongly favors a common policy on political asylum": 7,
                "strongly opposes a common policy on political asylum": 1
                },
            "eu_foreign_map": {
                "neutral towards a common foreign and security policy": 3,
                "strongly favors a common foreign and security policy": 7,
                "strongly opposes a common foreign and security policy": 1
                },
            "lrgen_map": {
                "extreme left": 0,
                'center': 5,
                'extreme right': 10,
            },
            "lrecon_map": {
                "extreme left": 0,
                'center': 5,
                'extreme right': 10,
            },
            "galtan_map": {
                'center': 5,
                'extreme tan': 10,
                },
            "multicult_salience_map": {
                "extremely important": 10}}

        exclude = ['family', 'govt', 'eu_ep', 'eu_fiscal', 'eu_employ', 'eu_agri', 'eu_environ', 'eu_turkey', 'mip_one', 'mip_two', 'mip_three', 'multicult_sal', '_mergexxx', 'chesversion', 'year', 'eumember', 'expert']

        for k, m in mapper.items():
            col = k.replace('_map', '')
            df[col] = df[col].replace(m)
            df[col] = df[col].astype(float)

        df = df[df['year'] == 2019]
        all_nan_values = pd.isnull(df).sum()
        exclude_all_nan_cols = list(all_nan_values[all_nan_values == df.shape[0]].index)
        include = [col for col in df.columns if col not in (exclude + exclude_all_nan_cols)]
        df = df[include]
        return df

In [None]:
class EuropeanParties:
    """ Database connector class"""
    def __init__(self):
        self._conn = self._create_connection()
        self._cursor = self._conn.cursor()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def _create_connection(self):
        import math
        conn =  sqlite3.connect("data/political-parties.db")
        conn.create_function('SQRT', 1, math.sqrt)
        conn.create_aggregate('VAR', 1, Variance)

        return conn

    def close(self):
        self._conn.close()

    def create_and_insert_table(self, dataframe: pd.DataFrame, table_name: str):
        dataframe.to_sql(table_name, self._conn, if_exists="replace", index=False)

    def query(self, query: str) -> pd.DataFrame:
        self._cursor.execute(query)
        column_names = list(map(lambda x: x[0], self._cursor.description))
        data = self._cursor.fetchall()
        return pd.DataFrame(data, columns=column_names)

class Variance:
    def __init__(self):
        self.n = 0
        self.sum = 0
        self.sum_squared = 0

    def step(self, value):
        try:
            if value is None:
                return None

            self.n += 1
            self.sum += value
            self.sum_squared += (value ** 2)
        except Exception as steperr:
            pass
            return None

    def finalize(self):
        avg_x_squared = self.sum_squared / self.n
        avg_x = self.sum / self.n
        variance = (avg_x_squared - (avg_x ** 2)) * (self.n / (self.n - 1))
        return variance

## 0.1. Extracting data and inserting in a table

In [None]:
extractor = DataExtractor()
raw_data = extractor.party_data
with EuropeanParties() as db:
    db.create_and_insert_table(raw_data, "raw_data")

all_data_query = """
    SELECT *
    FROM raw_data
"""
with EuropeanParties() as db:
    data = db.query(all_data_query)
display(data.head())

assert data.shape == raw_data.shape

## 0.2 Raw data descritive statistics
> Show some descriptive statistics

In [None]:
##### YOUR CODE GOES HERE #####


# 1. Data processing

In this task you are required to implement the methods in the class `DataLoader` for processing the data. These are the required steps of data processing you must write code for (using SQL):  

>**1.1. Duplicate rows removal**  
>
>   Remove rows in which have all elements are equal to each other leaving only one of them in the dataset.   
>
>**1.2. Handle NaN values.**
>
>   Fill the null values with zeros.
>
>**1.3. Scaling data.**  
>
>   Scale all the features in order for them to have zero mean and unit variance, i.e. $\overline{x} = 0$  and $\sigma^{2} = 1$.
>
>**1.4. Non-feature columns removal and dataframe index setting.**  
>   Remove a list of columns, passed as argument for the function/method, from the dataset. Additionaly, set another list of columns, also passed as argument for the function or method, as index of the dataframe.  
>
Finally after implementing the steps above put them all together inside the `preprocess_data` method.

In [None]:
class DataLoader:
    """Class to load the political parties dataset"""
    def __init__(self, connector: EuropeanParties, table_columns: List, numeric_features: List):
        self.connector = connector
        self.table_columns = table_columns
        self.non_features = []
        self.numeric_features = numeric_features
        self.index = ["party_id", "party", "country"]
        self.non_numeric_columns = [col for col in table_columns if col not in numeric_features]


    def _preprocess_query(self) -> str:
        """Write a function to combine the drop duplicates,
        handle nan values and scale feature queries."""
        ##### YOUR CODE GOES HERE #####
        pass

    def _drop_duplicates_query(self, table_name: str) -> str:
        """Write a function to remove duplicates in the DataBase table"""
        ##### YOUR CODE GOES HERE #####
        pass

    def _scale_features_query(self, table_name: str) -> str:
        """Scale features to have zero mean and unit variance.
        This function must return a SQL query"""
        # Note SQRT and VAR functions were implemented
        ##### YOUR CODE GOES HERE #####
        pass

    def _handle_NaN_values_query(self, table_name: str) -> str:
        """Write a function to handle NaN values.
        This function must return a SQL query"""
        ##### YOUR CODE GOES HERE #####
        pass

    def remove_nonfeature_cols(
        self, df: pd.DataFrame, non_features: List[str], index: List[str]
    ) -> pd.DataFrame:
        """Write a function to remove certain features cols and set certain cols as
        indices in a dataframe"""
        ##### YOUR CODE GOES HERE #####
        pass

    def preprocess_data(self) -> pd.DataFrame:
        """Write a function to combine all pre-processing steps for the dataset"""
        ##### YOUR CODE GOES HERE #####

        ###############################
        self.party_data = ...



class TestDataLoader:
    def __init__(self, connector: EuropeanParties, raw_data: pd.DataFrame, data_loader: DataLoader):
        self.connector = connector
        self.raw_data = raw_data
        self.data_loader = data_loader

    def check_drop_duplicates_query(self):
        with self.connector() as db:
            data = db.query(self.data_loader._drop_duplicates_query('raw_data'))
        logging.debug("1. Checking drop duplicates query")
        assert data.shape == (247, 54)
        logging.debug("1. Drop duplicates passed")
        # display(data)

    def check_handle_NaN_values_query(self):
        with self.connector() as db:
            data = db.query(self.data_loader._handle_NaN_values_query('raw_data'))
        logging.debug("2. Checking handle NaN values query")
        assert pd.isnull(data).sum().sum() == 0
        logging.debug("2. Handle NaN values passed")
        # display(data)

    def check_scale_features_query(self):
        from numpy.testing import assert_allclose
        logging.debug("3. Checking scale features query")
        with self.connector() as db:
            data = db.query(self.data_loader._scale_features_query('raw_data'))
        valid_cols = [col for col in data.columns if col != 'party_id']
        assert_allclose(data[valid_cols].mean(numeric_only=True), 0, atol=1e-07)
        logging.debug("3. Scale features passed")
        # display(data.head())


In [None]:
numeric_features = list([col for col in raw_data.select_dtypes(exclude=['object', "category"]).columns if col != 'party_id'])
table_columns = list(raw_data.columns)
data_loader = DataLoader(connector=EuropeanParties, table_columns=table_columns, numeric_features=numeric_features)

test_data_loader = TestDataLoader(connector=EuropeanParties, raw_data=raw_data, data_loader=data_loader)
test_data_loader.check_drop_duplicates_query()
test_data_loader.check_handle_NaN_values_query()
test_data_loader.check_scale_features_query()

## 1.1. Data processing and descriptive statistics
> Run the data processing and then show some descriptive statistics for the processed data

In [None]:
numeric_features = list([col for col in raw_data.select_dtypes(exclude=['object', "category"]).columns if col != 'party_id'])
table_columns = list(raw_data.columns)
data_loader = DataLoader(connector=EuropeanParties, table_columns=table_columns, numeric_features=numeric_features)
data_loader.preprocess_data()
sanitized_data = data_loader.party_data
##### YOUR CODE GOES HERE #####

###############################

# 2. Dimensionality reduction

Now you will need to implement the class `DimensionalityReducer` in order to project the data onto lower dimensional spaces.

Additionaly, implement the `scatter_plot` function to visualize the datapoints projected over $\mathbb{R}^{2}$.

In [None]:
logging.basicConfig(level=logging.INFO)

In [None]:
class DimensionalityReducer:
    """Class to model a dimensionality reduction method for the given dataset.
    1. Write a function to convert the high dimensional data to 2 dimensional.
    """

    def __init__(self, data: pd.DataFrame, n_components: int = 2):
        self.n_components = n_components
        self.data = data
        self.feature_columns = data.columns
        self.model = self._model()

    ##### YOUR CODE GOES HERE #####


In [None]:
def scatter_plot(
    transformed_data: pd.DataFrame,
    x: str,
    y: str,
    color: str,
    splot: pyplot.subplot = None,
    labels: Optional[List[str]] = None
):
    """Write a function to generate a 2D scatter plot."""

    ##### YOUR CODE GOES HERE #####


## 2.1. Dimensionality reduction

#### 2.1.1. Model training  

> Run the dimensionality reduction model training and then obtain the projected dataset.  

In [None]:
##### YOUR CODE GOES HERE #####

###############################

reduced_dim_data = dim_reducer.transform(data_loader.party_data)

> Plot the projected data on a scatter plot.

In [None]:
##### YOUR CODE GOES HERE #####

###############################

> Now plot the projeted data so as to left and right parties have differnent colors.

In [None]:
##### YOUR CODE GOES HERE #####

###############################

# 3. Density estimation

In this step you are going to implement the class `DensityEstimator`. The main goal here is being able to find an estimate of the unobserved underlying probability density function from the data. The class must also contain a method for generating theoretical parties, _i.e._ parties data generated by the density estimator.

Note that the dimensionality reduction model is passed as an argument of this class. You must estimate the density function for the lower-dimentional data. Further in the test you will need to sample data from the estimated density function and then map it back to its original space.

Furthermore, for the `plot_density_estimation_results` - used for visualizing the estimated density's region of confidence -  the only modification you are required to make is adapting the `scatter_plot` function call accordingly to the signature you set in your implementation.

In [None]:
import pandas as pd


class DensityEstimator:
    """Class to estimate Density/Distribution of the given data.
    1. Write a function to model the distribution of the political party dataset
    2. Write a function to randomly sample 10 parties from this distribution
    3. Map the randomly sampled 10 parties back to the original higher dimensional
    space as per the previously used dimensionality reduction technique.
    """

    def __init__(
        self, data: pd.DataFrame, dim_reducer, high_dim_feature_names, seed=42
    ):
        self.data = data
        self.dim_reducer_model = dim_reducer.model
        self.feature_names = high_dim_feature_names
        self.seed = seed
        self.model = self._model()

    ##### YOUR CODE GOES HERE #####


In [None]:
def plot_density_estimation_results(
    X: pd.DataFrame,
    Y_: np.ndarray,
    means: np.ndarray,
    covariances: np.ndarray,
    title: str,
):
    """Use this function to plot the estimated distribution"""
    color_iter = cycle(["navy", "c", "cornflowerblue", "gold", "darkorange", "g"])
    pyplot.figure()
    splot = pyplot.subplot()
    for i, (mean, covar, color) in enumerate(zip(means, covariances, color_iter)):
        v, w = np.linalg.eigh(covar)
        v = 2.0 * np.sqrt(2.0) * np.sqrt(v)
        u = w[0] / np.linalg.norm(w[0])
        if not np.any(Y_ == i):
            continue
        ##### YOUR CODE GOES HERE #####

        ###############################
        angle = np.arctan(u[1] / u[0])
        angle = 180.0 * angle / np.pi
        ell = mpl.patches.Ellipse(mean, v[0], v[1], 180.0 + angle, color=color)
        ell.set_clip_box(splot.bbox)
        ell.set_alpha(0.5)
        splot.add_artist(ell)
    pyplot.title(title)

## 3.1. Model training
> Execute training for the density estimation model.

In [None]:
##### YOUR CODE GOES HERE #####

###############################

## 3.2. Sampling from the distribution

> Draw 1,000 samples from your density estimation model and then project them back to the original space of the data.  

In [None]:
##### YOUR CODE GOES HERE #####

###############################

## 3.3. Label the party data

> Label the original data according to the estimated density components. Also label the projected data.   

In [None]:
##### YOUR CODE GOES HERE #####

###############################

## 3.4. Plot the confidence regions

> Plot the estimated confidence regions for each of the components of the trained density estimator alongside the real (projected) data in order to have an intuition about the model performance.

In [None]:
##### YOUR CODE GOES HERE #####

###############################