<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Instructions" data-toc-modified-id="Instructions-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Instructions</a></span></li><li><span><a href="#Workflow-Summary" data-toc-modified-id="Workflow-Summary-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Workflow Summary</a></span></li><li><span><a href="#Preliminaries:-library-imports" data-toc-modified-id="Preliminaries:-library-imports-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Preliminaries: library imports</a></span></li><li><span><a href="#Data-loading" data-toc-modified-id="Data-loading-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Data loading</a></span></li><li><span><a href="#Data-dictionary" data-toc-modified-id="Data-dictionary-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Data dictionary</a></span><ul class="toc-item"><li><span><a href="#location" data-toc-modified-id="location-5.1"><span class="toc-item-num">5.1&nbsp;&nbsp;</span><code>location</code></a></span></li><li><span><a href="#psgc" data-toc-modified-id="psgc-5.2"><span class="toc-item-num">5.2&nbsp;&nbsp;</span><code>psgc</code></a></span></li></ul></li><li><span><a href="#Cleaning:-location-files" data-toc-modified-id="Cleaning:-location-files-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Cleaning: location files</a></span><ul class="toc-item"><li><span><a href="#first-pass-cleaning-:-provincial-level" data-toc-modified-id="first-pass-cleaning-:-provincial-level-6.1"><span class="toc-item-num">6.1&nbsp;&nbsp;</span>first pass cleaning : provincial level</a></span></li><li><span><a href="#second-pass-cleaning:-municipal-level" data-toc-modified-id="second-pass-cleaning:-municipal-level-6.2"><span class="toc-item-num">6.2&nbsp;&nbsp;</span>second pass cleaning: municipal level</a></span></li><li><span><a href="#third-pass:-barangay-level" data-toc-modified-id="third-pass:-barangay-level-6.3"><span class="toc-item-num">6.3&nbsp;&nbsp;</span>third pass: barangay level</a></span></li><li><span><a href="#final-precinct-code-lookup" data-toc-modified-id="final-precinct-code-lookup-6.4"><span class="toc-item-num">6.4&nbsp;&nbsp;</span>final precinct code lookup</a></span></li></ul></li><li><span><a href="#consolidate-lookup-information" data-toc-modified-id="consolidate-lookup-information-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>consolidate lookup information</a></span><ul class="toc-item"><li><span><a href="#from-the-location-tables" data-toc-modified-id="from-the-location-tables-7.1"><span class="toc-item-num">7.1&nbsp;&nbsp;</span>from the location tables</a></span></li><li><span><a href="#save-lookup-tables" data-toc-modified-id="save-lookup-tables-7.2"><span class="toc-item-num">7.2&nbsp;&nbsp;</span>save lookup tables</a></span></li></ul></li><li><span><a href="#Denormalized-results-tables" data-toc-modified-id="Denormalized-results-tables-8"><span class="toc-item-num">8&nbsp;&nbsp;</span>Denormalized results tables</a></span></li></ul></div>

# Instructions

Datasets:
* `ref_table_precinct_locations_PSGC.csv` – lookup table for precincts
* `results_president.csv` – precinct-level election results for the 2016 presidential race
* `results_vice-president.csv` – precinct-level election results for the 2016 vice presidential race

Tasks:
1. Create a denormalized table replacing precinct_code in the results_*.csv files with the columns: region, province, municipality, and barangay. 

2. Create an interesting data visualization using this dataset.

# Workflow Summary

1. Initial data inspection and data dictionary building
2. Data cleaning
3. EDA

Since these datasets are rich with geodata, we'll use GADM shape files for plotting later on. This entails cleaning the location tags to match with the GADM labels. The general workflow for cleaning is divided in to three parts:
1. Exact matching by merging the shape files and the location dataframe
2. Manual cleaning for some items, especially those tagged as overseas votes since they are easily identifiable and we're sure that they're not in our shape files.
3. Fuzzy matching for the rest of the entries.

The result after these steps is a lookup table of precinct codes with the correct GADM location tags so we can easily merge them in Tableau for visualization, and a denormalized table for the `results` file so we can do preliminary EDA on the dataset.

# Preliminaries: library imports

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

#geodata
import geopandas as gpd

#utilities
import os
import shutil
from glob import glob
from operator import itemgetter
from collections import defaultdict
import json
from tqdm import tqdm

#for fuzzy maching
from fuzzywuzzy import process

# spark imports
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql import Row

sc = SparkContext()
spark = SparkSession(sc)

In [2]:
try:
    os.mkdir(os.path.join('..', 'output'))
except:
    print('folder already exists!')

folder already exists!


# Data loading

In [3]:
shape_files = sorted(glob(os.path.join('..', 'gadm', "gadm*.shp")))[1:]
data_files = sorted(glob(os.path.join('..', 'datasets', '*.csv')))

In [4]:
datasets = dict(map(lambda x: (x.strip(r'.csv').split('_')[-1].lower().replace('-', '_'), x), data_files))
datasets

{'location': '../datasets/ref_table_precinct_locations.csv',
 'psgc': '../datasets/ref_table_precinct_locations_PSGC.csv',
 'president': '../datasets/results_president.csv',
 'vice_president': '../datasets/results_vice-president.csv'}

In [5]:
shapes = dict(zip(['provincial', 'municipal', 'brgy'], shape_files))
shapes

{'provincial': '../gadm/gadm36_PHL_1.shp',
 'municipal': '../gadm/gadm36_PHL_2.shp',
 'brgy': '../gadm/gadm36_PHL_3.shp'}

In [6]:
for label, filepath in tqdm(datasets.items()):
    datasets[label] = spark.read.csv(filepath, sep=',', header=True)
#     print(label)
#     display(datasets[label].limit(2).toPandas())
    datasets[label].createOrReplaceTempView(label)

100%|██████████| 4/4 [00:06<00:00,  1.72s/it]


In [7]:
for label, filepath in tqdm(shapes.items()):
#     print(label)
    gdf = gpd.GeoDataFrame.from_file(filepath)
    col_include = list(filter(lambda x: x.startswith('NAME') or x.startswith('VAR'), gdf.columns))
    df = pd.DataFrame(gdf[col_include])
    for col in df.columns:
        df[col] = df[col].astype(str) 
    shapes[label] = spark.createDataFrame(df)
    shapes[label].createOrReplaceTempView(label)
#     display(shapes[label].limit(2).toPandas())

100%|██████████| 3/3 [00:06<00:00,  2.16s/it]


In [8]:
spark.sql("SHOW TABLES").show()

+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
|        |          brgy|       true|
|        |      location|       true|
|        |     municipal|       true|
|        |     president|       true|
|        |    provincial|       true|
|        |          psgc|       true|
|        |vice_president|       true|
+--------+--------------+-----------+



# Data dictionary

```{'location': '../datasets/ref_table_precinct_locations.csv',
 'psgc': '../datasets/ref_table_precinct_locations_PSGC.csv',
 'president': '../datasets/results_president.csv',
 'vice_president': '../datasets/results_vice-president.csv'}```

In [9]:
datasets.keys()

dict_keys(['location', 'psgc', 'president', 'vice_president'])

## `location`
* filename : `ref_table_precinct_locations.csv`

In [10]:
datasets['location'].printSchema()

root
 |-- precinct_code: string (nullable = true)
 |-- region: string (nullable = true)
 |-- province: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- barangay: string (nullable = true)
 |-- registered_voters: string (nullable = true)
 |-- ballots_cast: string (nullable = true)



In [11]:
datasets['location'].show(2)

+-------------+--------+----------+------------+--------+-----------------+------------+
|precinct_code|  region|  province|municipality|barangay|registered_voters|ballots_cast|
+-------------+--------+----------+------------+--------+-----------------+------------+
|     55170026|REGION I|PANGASINAN|    CALASIAO| BUENLAG|              592|         503|
|     55170027|REGION I|PANGASINAN|    CALASIAO| BUENLAG|              526|         458|
+-------------+--------+----------+------------+--------+-----------------+------------+
only showing top 2 rows



## `psgc`

* filename: `ref_table_precinct_locations_PSGC.csv`

**NOTES**
* PSGC = [Philippine Standard Geographic Code](https://psa.gov.ph/classification/psgc/)
* same fields as `location`:
```python
 |-- precinct_code: string (nullable = true)
 |-- region: string (nullable = true)
 |-- province: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- barangay: string (nullable = true)
 |-- registered_voters: string (nullable = true)
 |-- ballots_cast: string (nullable = true)
```

* additional fields:
    * `PSGC_CM`, `province_CM`, `municipality_CM` - PSGC-specific code, goes down to barangay level
    * `is_city` - flag for city, 1 if city, 0 if not
    * `income_class` - municipal level
    * `population` - population, municipal level
    * `land_area` - barangay level


In [16]:
datasets['psgc'].printSchema()

root
 |-- _c0: string (nullable = true)
 |-- precinct_code: string (nullable = true)
 |-- region: string (nullable = true)
 |-- province: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- barangay: string (nullable = true)
 |-- registered_voters: string (nullable = true)
 |-- ballots_cast: string (nullable = true)
 |-- PSGC_CM: string (nullable = true)
 |-- is_city: string (nullable = true)
 |-- income_class: string (nullable = true)
 |-- population: string (nullable = true)
 |-- land_area: string (nullable = true)
 |-- province_CM: string (nullable = true)
 |-- municipality_CM: string (nullable = true)



In [19]:
datasets['psgc'].where('is_city <> 0').toPandas()

Unnamed: 0,_c0,precinct_code,region,province,municipality,barangay,registered_voters,ballots_cast,PSGC_CM,is_city,income_class,population,land_area,province_CM,municipality_CM
0,816,55180077,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,650,555,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
1,817,55180075,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,784,655,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
2,818,55180073,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,772,640,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
3,819,55180071,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,655,561,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
4,820,55180076,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,688,577,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
31993,90618,72010069,REGION IX,ZAMBOANGA DEL NORTE,DAPITAN CITY,SAN PEDRO,462,418,097201000,1.0,3rd Class,,39053.0,ZAMBOANGA DEL NORTE,DAPITAN CITY
31994,90619,72010013,REGION IX,ZAMBOANGA DEL NORTE,DAPITAN CITY,POTOL (POB.),476,435,097201000,1.0,3rd Class,,39053.0,ZAMBOANGA DEL NORTE,DAPITAN CITY
31995,90620,72010014,REGION IX,ZAMBOANGA DEL NORTE,DAPITAN CITY,POTOL (POB.),569,506,097201000,1.0,3rd Class,,39053.0,ZAMBOANGA DEL NORTE,DAPITAN CITY
31996,90621,72010083,REGION IX,ZAMBOANGA DEL NORTE,DAPITAN CITY,TAG-OLO,479,434,097201000,1.0,3rd Class,,39053.0,ZAMBOANGA DEL NORTE,DAPITAN CITY


Unnamed: 0,_c0,precinct_code,region,province,municipality,barangay,registered_voters,ballots_cast,PSGC_CM,is_city,income_class,population,land_area,province_CM,municipality_CM
0,816,55180077,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,650,555,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
1,817,55180075,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,784,655,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
2,818,55180073,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,772,640,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
3,819,55180071,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,655,561,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
4,820,55180076,REGION I,PANGASINAN,DAGUPAN CITY,CARANGLAAN,688,577,015518000,1.0,2nd Class,,3723.0,PANGASINAN,DAGUPAN CITY
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
31993,90618,72010069,REGION IX,ZAMBOANGA DEL NORTE,DAPITAN CITY,SAN PEDRO,462,418,097201000,1.0,3rd Class,,39053.0,ZAMBOANGA DEL NORTE,DAPITAN CITY
31994,90619,72010013,REGION IX,ZAMBOANGA DEL NORTE,DAPITAN CITY,POTOL (POB.),476,435,097201000,1.0,3rd Class,,39053.0,ZAMBOANGA DEL NORTE,DAPITAN CITY
31995,90620,72010014,REGION IX,ZAMBOANGA DEL NORTE,DAPITAN CITY,POTOL (POB.),569,506,097201000,1.0,3rd Class,,39053.0,ZAMBOANGA DEL NORTE,DAPITAN CITY
31996,90621,72010083,REGION IX,ZAMBOANGA DEL NORTE,DAPITAN CITY,TAG-OLO,479,434,097201000,1.0,3rd Class,,39053.0,ZAMBOANGA DEL NORTE,DAPITAN CITY


KeyError: 'ira'

# Cleaning: location files

## first pass cleaning : provincial level

In [None]:
spark.sql('select distinct region, province from location').count()

In [None]:
lookup = spark.sql("""
            SELECT DISTINCT
                --l.precinct_code, 
                l.region, 
                l.province, 
                --l.municipality,
                --l.barangay,
                p.NAME_1 gadm_province 
                --b.NAME_2 gadm_municipality, 
                --b.NAME_3 gadm_brgy
            FROM location l
            LEFT JOIN provincial p
                ON (UPPER(p.NAME_1) = l.province OR (UPPER(p.VARNAME_1) = l.province))
--                AND UPPER(b.NAME_2) = l.municipality
--                AND UPPER(b.NAME_3) = l.barangay
""")
lookup.createOrReplaceTempView('lookup')

In [None]:
spark.sql('SELECT * FROM lookup WHERE gadm_province IS NULL').toPandas()

In [None]:
lookup = spark.sql("""
            SELECT 
                --precinct_code,
                region, 
                province,
                CASE 
                    WHEN region = "NCR"
                        THEN "Metropolitan Manila"
                    WHEN region = "OAV"
                        THEN province
                    WHEN province = "DAVAO OCCIDENTAL"
                        THEN "Davao del Sur"
                    WHEN province LIKE "%WESTERN%"
                        THEN "Samar"
                    WHEN province LIKE "%DAVAO DEL NORTE%"
                        THEN "Davao del Norte"
                    WHEN province LIKE "COTABATO%NORTH%"
                        THEN "North Cotabato"
                        ELSE gadm_province
                END as gadm_province
            FROM lookup
""")
lookup.createOrReplaceTempView('lookup')
spark.sql('SELECT * FROM lookup WHERE gadm_province IS NULL').toPandas()

In [None]:
spark.sql('select count(*) from lookup').show()

## second pass cleaning: municipal level

In [None]:
spark.sql('select distinct region, province, municipality from location').count()

In [None]:
lookup = spark.sql("""
            SELECT DISTINCT
                k.region, 
                k.province, 
                k.gadm_province, 
                l.municipality, 
                m.NAME_2 gadm_municipality
            FROM lookup k
            LEFT JOIN (SELECT DISTINCT province, municipality FROM location) as l
                ON l.province = k.province
            LEFT JOIN municipal m
                ON m.NAME_1 = k.gadm_province 
                    AND (
                        UPPER(m.NAME_2) = l.municipality 
                            OR UPPER(m.VARNAME_2) = l.municipality
                        )
""")
lookup.createOrReplaceTempView('lookup')

In [None]:
spark.sql('select count(*) from lookup').show()

In [None]:
spark.sql('SELECT * FROM lookup WHERE gadm_municipality IS NULL').toPandas()

In [None]:
lookup = spark.sql("""
            SELECT 
                --precinct_code,
                region, 
                province,
                gadm_province,
                municipality,
                CASE 
                    WHEN province LIKE "%MANILA"
                        THEN "Manila"
                    WHEN region = "OAV"
                        THEN municipality
                        ELSE gadm_municipality
                END as gadm_municipality
            FROM lookup
""")
lookup.createOrReplaceTempView('lookup')

In [None]:
spark.sql('SELECT * FROM lookup WHERE gadm_municipality IS NULL').toPandas()

In [None]:
spark.sql('select count(*) from lookup').show()

In [None]:
def create_reference_dict(query):
    reference = spark.sql(query).toJSON().collect()
    ref = defaultdict(list)
    for entry in reference:
        items = json.loads(entry)
        if len(items) < 3:
            ref[items['NAME_1']] += [items['NAME_2']]
        else:
            ref[(items['NAME_1'], items['NAME_2'])] += [items['NAME_3']]
    return ref

In [None]:
query = """
                SELECT NAME_1, NAME_2
                FROM municipal
                WHERE NAME_1 IN (SELECT gadm_province FROM lookup WHERE gadm_municipality IS NULL)
                --AND NAME_2 NOT IN (SELECT gadm_municipality FROM lookup WHERE gadm_municipality IS NOT NULL)
    """
reference = create_reference_dict(query)

In [None]:
def map_municipalities(r):
    if r.gadm_municipality:
        return r
#     elif r.region == "OAV":
#         return Row(r.region, r.province, r.gadm_province, r.municipality, r.municipality)
    else:
        m = r.municipality
        p = r.gadm_province
        try:
            municipalities = reference[p]
            g = process.extractOne(m, municipalities)[0]
        except:
            return r
        if g is None:
            return r
        else:
            return Row(r.region, r.province, r.gadm_province, r.municipality, g)

In [None]:
rdd = lookup.rdd.map(map_municipalities)
lookup = rdd.toDF()
lookup.createOrReplaceTempView('lookup')

In [None]:
spark.sql('select * from lookup where gadm_municipality is null').toPandas()

In [None]:
spark.sql('select count(*) from lookup').show()

## third pass: barangay level

In [None]:
spark.sql('select distinct region, province, municipality, barangay from location').count()

In [None]:
lookup = spark.sql("""
            SELECT DISTINCT
                k.region, 
                k.province, 
                k.gadm_province, 
                k.municipality, 
                k.gadm_municipality,
                l.barangay
                --, b.NAME_3 gadm_brgy
            FROM lookup k
            LEFT JOIN location l
                ON l.province = k.province
                AND l.municipality = k.municipality

""")
lookup.createOrReplaceTempView('lookup')

In [None]:
spark.sql('select * from lookup limit 5').show()

In [None]:
lookup = spark.sql("""
            SELECT DISTINCT 
                k.region, 
                k.province, 
                k.gadm_province, 
                k.municipality, 
                k.gadm_municipality, 
                k.barangay, 
                b.NAME_3 gadm_brgy
            FROM lookup k
            LEFT JOIN brgy b
            ON b.NAME_1 = k.gadm_province
            AND b.NAME_2 = k.gadm_municipality
            AND (UPPER(b.NAME_3) = k.barangay)
""")
lookup.createOrReplaceTempView('lookup')

In [None]:
lookup.show(5)

In [None]:
lookup.count()

In [None]:
spark.sql('SELECT count(*) FROM lookup WHERE gadm_brgy IS NULL').show()

In [None]:
lookup = spark.sql("""
            SELECT 
                --precinct_code,
                region, 
                province,
                gadm_province,
                municipality,
                gadm_municipality,
                barangay,
                CASE 
                    WHEN region = "OAV"
                        THEN barangay
                        ELSE gadm_brgy
                END as gadm_brgy
            FROM lookup
""")
lookup.createOrReplaceTempView('lookup')

In [None]:
spark.sql('SELECT count(*) FROM lookup WHERE gadm_brgy IS NULL').show()

In [None]:
query = """
            SELECT NAME_1, NAME_2, NAME_3
            FROM brgy
            WHERE NAME_1 in (SELECT gadm_province FROM lookup WHERE gadm_brgy IS NULL)
            AND NAME_2 in (SELECT gadm_municipality FROM lookup WHERE gadm_brgy IS NULL)
            --AND NAME_2 NOT IN (SELECT gadm_municipality FROM lookup WHERE gadm_municipality IS NOT NULL)
"""
reference = create_reference_dict(query)

In [None]:
len(reference)

In [None]:
def map_brgys(r):
    if r.gadm_brgy:
        return r
    else:
        m = r.gadm_municipality
        p = r.gadm_province
        b = r.barangay
        
        brgys = reference[(p, m)]
        g = process.extractOne(b, brgys)[0]
        if g is None:
            return r
        else:
            return Row(r.region, r.province, r.gadm_province, r.municipality, r.gadm_municipality, r.barangay, g)

In [None]:
rdd = lookup.rdd.map(map_brgys)
lookup = rdd.toDF()
lookup.createOrReplaceTempView('lookup')

In [None]:
spark.sql('select count(*) from lookup where gadm_brgy is null').show()

In [None]:
spark.sql('select * from lookup limit 5').show()

In [None]:
lookup.count()

## final precinct code lookup

In [None]:
spark.sql('select count(distinct precinct_code) from location').show()

In [None]:
spark.sql('select count(*) from lookup').show()

In [None]:
spark.sql('select distinct(*) from lookup').count()

In [None]:
lookup.show(2)

In [None]:
final_lookup = spark.sql("""
            SELECT DISTINCT
                l.precinct_code, 
                k.region,
                k.gadm_province, 
                k.gadm_municipality, 
                k.gadm_brgy
            FROM location l
            LEFT JOIN lookup k
                ON k.province = l.province
                AND k.municipality = l.municipality
                AND k.barangay = l.barangay
""")
final_lookup.createOrReplaceTempView('final_lookup')

# consolidate lookup information

In [None]:
spark.sql('show tables').show()

## from the location tables
* `final_lookup` has the clean GADM tags for region, province, municipality and brgy
* `lookup` has the mapping for unclean --> clean geotags
* `psgc` has additional location information such as land area, population, etc. 

In [None]:
spark.sql('select * from final_lookup limit 5').show()

In [None]:
spark.sql('select * from location limit 5').show()

In [None]:
spark.sql('select  from psgc limit 5').toPandas()

In [None]:
spark.sql('select * from president limit 5').toPandas()

In [None]:
final_lookup.count()

In [None]:
spark.sql('''select * from lookup 
          where province = "SORSOGON" 
          and municipality="SORSOGON CITY" and barangay="POBLACION"''').show()

In [None]:
spark.sql('select * from location where precinct_code = 62160095').show()

## save lookup tables

In [None]:
def save_query_to_csv(query, filename):
    path = os.path.join('..', 'output', filename)
    try:
        spark.sql(query
                 ).coalesce(1).write.option(
            "header", "true").csv(path)
        print('file saved!')
    except:
        inp = input('file already exists! retry save? Y/N')
        if inp =='Y':
            shutil.rmtree(path)
            save_query_to_csv(query, filename)
        else:
            pass

In [None]:
save_query_to_csv('select * from final_lookup', 'lookup')

In [None]:
save_query_to_csv('''select 
                precinct_code, 
                registered_voters, 
                ballots_cast 
            from location''', 'location')

In [None]:
save_query_to_csv('''select precinct_code,
              registered_voters, 
              ballots_cast, 
              is_city, 
              CASE WHEN SUBSTRING(income_class, 0,3) = "-"
                  THEN NULL
                  ELSE SUBSTRING(income_class, 0,3)
                END as income_class,
              population, 
              land_area 
          from psgc''', 'psgc')

# Denormalized results tables

In [None]:
for table in ['president', 'vice_president']:
    spark.sql(f"""
                SELECT 
                    f.region, 
                    f.gadm_province, 
                    f.gadm_municipality, 
                    f.gadm_brgy,
                    p.precinct_code, 
                    p.contest_code, 
                    p.candidate_name, 
                    p.party_code,
                    p.votes, 
                    p.col5, 
                    p.ballots_cast, 
                    p.col7, 
                    p.col8, 
                    p.pct_votes
                FROM {table} p
                JOIN final_lookup f 
                ON p.precinct_code = f.precinct_code

    """).createOrReplaceTempView(f'denorm_{table[:4]}')

In [None]:
spark.sql('select * from denorm_vice limit 5').toPandas()

In [None]:
spark.sql('select * from denorm_pres limit 5').toPandas()

In [None]:
for table in tqdm(['pres', 'vice']):
    save_query_to_csv(f'select * from denorm_{table}', f'denorm_{table}')

In [None]:
files = glob(os.path.join('..', 'output','*','*csv'))
files

In [None]:
ogfiles = list(map(lambda x: os.path.split(x)[-1], files))
ogfiles

In [None]:
filenames = list(map(lambda x: x.split(os.sep)[2]+r'.csv', files))
filenames

In [None]:
for path, filename in tqdm(zip(files, filenames)):
    try:
        ogfilename = os.path.split(path)[-1]
        shutil.move(path, os.path.join('..', 'output'))
        os.rename(os.path.join('..', 'output', ogfilename), os.path.join('..', 'output', filename))
        shutil.rmtree(os.path.join('..', 'output', filename.split('.')[0]))
    except FileNotFoundError:
        pass