In [None]:
import os
import sys
import csv
import glob
import json
import subprocess

# Integration

For each sub dataset, all the data are distributed on different files. As we have seen in part 1, for 3 sub-dataset (fvh, green, yellow) the pattern of the csv files has varied over time. This integration step will therefore make it possible to work later on data that have the same schema. 

In practice, for the 3 subdatasets, the most recent schema will be taken as the reference schema. All files with a different schema will be converted to their reference schema. This conversion can be either a simple change of column name but can also be a function of several columns.

The first step will therefore be to identify these transformations.

### 1. Identification of the transformations

The 3 following figures represent the correspondences between the different schemas of the same sub dataset. A reference schema is composed of the entries in the blue boxes. The names or functions below allow to retrieve or rebuild the data from a file built on a different schema. For example for the FHV subdataset the data _pulocationid_ can be found in a _pulocationid_ column but also in a _locationid_ column. In the green subdataset, this same data can also be found in the _pulocationid_ column but also by a function of the _pickup_longitude_ and _pickup_latitude_ columns.

<img src="img/transformation_fhv.png" width="800" align="left"/>
<img src="img/transformation_green.png" width="800" align="left"/>
<img src="img/transformation_yellow.png" width="800" align="left"/>

### 2 Implementation

To perform these transformations, we decided to implement a very general function to avoid having to write a specific function for each dataset. The idea of this function is that it transforms a row of data not conforming to the reference scheme into a conforming row from a json configuration file. 

<img src="img/transformation_function.png" width="800" align="left"/>

The json configuration file is subdataset specific, i.e. for each subdataset a configuration file must be defined. These configurations describe the hierarchical diagrams shown above. Part of one of these files (green dataset) is shown below. The whole files are in the repository in the _config_files_ folder. 

```json
{ 
 "vendorid": [
              {
               "type": "column", 
               "content": "vendorid"
              },
              {
               "type": "column",
               "content": "vendor_id"
              }
             ],

 ...
 
 "store_and_fwd_flag": [
                        {
                         "type": "column", 
                         "content": "store_and_fwd_flag"
                        },
                        {
                         "type": "column", 
                         "content": "store_and_forward"
                        }
                       ],

 "pulocationid": [
                  {
                   "type": "column", 
                   "content": "pulocationid"
                  },
                  {
                   "type": "function",
                   "content": {
                               "func_name": "compute_location_id",
                               "params": ["pickup_longitude", "pickup_latitude"]
                              }
                  },
                  {
                   "type": "function",
                   "content": {
                               "func_name": "compute_location_id",
                               "params": ["start_lon", "start_lat"]
                              }
                  }
                 ], 
                         
 ...
}

```

In this json, the keys are the entries in the reference scheme and the values are _aliases_ of the key. An alias has a different name but represents in another file the same data as the key. There are several types of aliases :

* *column* : the data is retrieved from another column whose name is specified by _content_.
* *function* : the data is calculated from the data of several other columns. The name of the function is the name of columns-parameters are specified by _content_.

The function will simply read the json and loop through its keys. For each key, it checks either that the column name or parameter names are in the wrong schema. If this is the case it retrieves the associated data and copies/calculates the value to be associated with the key in the new good schema.

In our case, there will be column name changes and a single multi-column function that transforms geographic coordinates (latitude, longitude) into an area identifier. This function needs another parameter other than column names: a dataframe geopandas of geographical areas. The integration function must allow this parameter to be passed.

In [None]:
def integrate(data, schema, integration_conf, params_f):

    """
    Transforms data into the desired schema 
    by following the configuration

    :param data: list of original data
    :param schema: schema of original data
    :param integration_conf: dict with the configuration
    :param params_f: params for functions of columns
    """

    data = dict(zip(schema, data))
    t_data = dict() # transformed data

    # loop through all the columns of the reference schema
    for column, alias_list in integration_conf.items():
        found = False

        # loop through all alias (column or function)
        for alias in alias_list:
            category = alias['type']
            content = alias['content']

            # check category of the alias
            # the alias is an other column name
            if category == 'column':

                # check that this name is in of the schema data
                if content in list(data.keys()):
                    t_data[column] = data[content]
                    found = True
                    break

            # the alias is a function with other column name as param
            elif category == 'function':

                func_name = content['func_name']
                param_names = content['params']
                params = []

                # check that all the params are in the schema of the data
                eval_func = True
                for param_name in param_names:
                    if param_name not in list(data.keys()):
                        eval_func = False
                    else:
                        params.append(data[param_name])

                # eval the function if all the params are there
                if eval_func :
                    eval("dkdkdk")
                    
                    
        # if there is no valid alias add empty data
        if not found:
            t_data[column] = ''

    data = list(t_data.values())

    return data

The biggest advantage of this function is its flexibility. I.e. if in the future a schema transformation is different (an extra column, changing date format, new taxi zone, ...), the function will remain valid. We have in fact simply separated the fixed part from the variable part of the transformation. The fixed part is the code of the function and the variable part is passed as a parameter.

To make this flexibility possible, we use the python function _eval()_ which allows to evaluate python code passed as a string. For functions with several columns, a string "name_of_the_function(param1, param2, ...)" is constructed from the config file and passed to the _eval_ function.

In the rest of the project, we will still have to implement functions that work with rows, to group these functions, we define a static Row class where each of these functions will be a static method. This class can be found in the _row.py_ file.

### 3. Implementation with spark

Comme l'intégration d'un row est indépendant de l'intégration d'un autre row, on va pouvoir utiliser spark pour parraléliser cette opération. 

1. lecture le fichier csv dans HDFS
2. application de la fonction _process_ aux rows : elle transforme un _string_ en _list_
3. application la fonction _integrate_ aux rows
4. application de la fonction _join_ aux rows : elle transforme une _list_ en _string_
5. écriture des rows dans HDFS

<img src="img/spark.png" width="800" align="left"/>

In [None]:
os.environ['HADOOP_CONF_DIR']="/etc/hadoop/conf"

os.environ['PYSPARK_PYTHON']="/usr/local/anaconda3/bin/python"
os.environ['PYSPARK_DRIVER_PYTHON']="/usr/local/anaconda3/bin/python"

from pyspark.sql import SparkSession
from pyspark import SparkFiles


#remove old spark session
try: 
    spark
    print("Spark application already started. Terminating existing application and starting new one")
    spark.stop()
except: 
    pass

# Create a new spark session, with YARN as resource manager, requesting 4 worker nodes.
spark = SparkSession \
    .builder \
    .master("yarn") \
    .config("spark.executor.instances","4") \
    .appName("project_ceci18") \
    .getOrCreate()

#When dealing with RDDs, we work the sparkContext object. See https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext
sc=spark.sparkContext

In [None]:
# spark configuration
sc.addFile("./row.py")
sc.addFile("./transformations.py")
sys.path.insert(0, SparkFiles.getRootDirectory())

In [None]:
from row import Row
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point

dataset = "green"

# build dict with configurations
integration_confs = dict()
conf_filenames = sorted(glob.glob('/home/ceci18/INFOH600-project/integration_conf/*.json'))
for conf_filename in conf_filenames:    
    with open(conf_filename, 'r') as f:
        integration_conf = json.load(f)
        dataset_name = os.path.basename(conf_filename)[:-5]
        integration_confs[dataset_name] = integration_conf

# get all the filename
hdfs_path = 'hdfs://public00:8020/user/hpda000034/infoh600/sampled'
local_path = '/home/hpda00034/infoh600/sampled'
filenames = sorted(glob.glob("{}/{}_*.csv".format(local_path, dataset)))
filenames = [os.path.basename(filename) for filename in filenames]

# construct a rtree index with the geopanda df
zones = gpd.read_file("./shape_files/taxi_zones.shp")
#zones.set_geometry('geometry', crs=(u'epsg:'+str(4326)), inplace=True)
zones = zones.to_crs({'init':'epsg:4326'})

# get the last schema of the dataset
last_schema = list(integration_confs[dataset].keys())


# Create one big rdd that holds all of the file's contents
for filename in filenames:
    print(filename)
    # write the right schema on the first line
    final_rdd = sc.parallelize([','.join(last_schema)])
    
    # get the schema of the file
    file = open('{}/{}'.format(local_path, filename), 'r')
    schema = file.readline().replace('\n', '')
    schema = schema.replace('"','').replace("'", '').lower().split(',')
    
    rdd = sc.textFile('{}/{}'.format(hdfs_path, filename))
    # Each original file contains also the header (schema). 
    # We ignore this first line and convert everything else
    rdd = rdd.zipWithIndex() \
             .filter(lambda x:x[1] > 1) \
             .repartition(4)\
             .map(lambda x: Row.process(x[0]))\
             .map(lambda x: Row.integrate(x, schema, integration_confs[dataset], zones))\
             .map(lambda x: Row.join(x))
            # .map(lambda x: ','.join(x))
    # add it to the rdd that we already have
    final_rdd = final_rdd.union(rdd)

    # saves this to HDFS in your home HDFS folder
    final_rdd.saveAsTextFile('./integrated/{}/{}'.format(dataset, filename[:-4]))

In [None]:
# Stop spark
try: 
    spark.stop()
except: 
    pass

We must then gather the partitions into a single file :

In [None]:
dataset = "green"
local_path = '/home/hpda00034/infoh600/sampled'
filenames = sorted(glob.glob("{}/{}_*.csv".format(local_path, dataset)))
filenames = [os.path.basename(filename) for filename in filenames]

for filename in filenames:
    print(filename)
    subprocess.run(['hadoop', 'fs', '-getmerge', './integrated/{}/{}/*'.format(dataset, filename[:-4]),
                    '../integrated/{}/{}'.format(dataset, filename)])
    subprocess.run(['hadoop', 'fs', '-rm', '-r', './integrated/{}/{}/'.format(dataset, filename[:-4])])
    subprocess.run(['hadoop', 'fs', '-moveFromLocal', '../integrated/{}/{}'.format(dataset, filename),
                    './integrated/{}/'.format(dataset)])

In [71]:
from matplotlib import pyplot

dataset = "fhv"
path = "../invalid_data"
filenames = sorted(glob.glob("{}/{}/*.txt".format(path, dataset)))


errors_by_file = {}
errors_by_column = {}
for filename in filenames:
    f = open(filename, 'r')
    lines = f.readlines()
    lines = [eval(line.replace('\n','')) for line in lines]
    comma_indexs = [line[0].index(',') for line in lines]
    lines = [(line[0][8:index], line[1]) for line, index in zip(lines, comma_indexs)]
    count = sum([line[1] for line in lines])
    
    if count != 0:
        errors_by_file[filename] = count
    
    for line in lines:
        if line[0] not in errors_by_column.keys():
            errors_by_column[line[0]] = line[1]
        else:
            errors_by_column[line[0]] += line[1]

print(errors_by_column)
print(errors_by_file)
        

{'sr_flag': 150349, 'dispatching_base_num': 13}
{'../invalid_data/fhv/fhv_tripdata_2019-01.txt': 150357, '../invalid_data/fhv/fhv_tripdata_2019-02.txt': 5}


In [89]:
import pandas as pd
from row import Row
from pandas_schema import Column, Schema
import pandas_schema.validation as validation 

df = pd.read_csv('../green_tripdata_2013-09.csv')


columns = ['vendorid', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 
          'ratecodeid', 'pulocationid', 'dolocationid', 'passenger_count', 'trip_distance', 
          'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 
          'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge']


df.columns = columns

df.vendorid.head()


# validation schema for green dataset
schema_green = Schema([
    Column('vendorid', 
           [validation.InListValidation([1, 2])], 
           allow_empty=True),
    Column('lpep_pickup_datetime', 
           [validation.DateFormatValidation('%Y-%m-%d %H:%M:%S')],
           allow_empty=True),
    Column('lpep_dropoff_datetime', 
           [validation.DateFormatValidation('%Y-%m-%d %H:%M:%S')],
           allow_empty=True),
    Column('store_and_fwd_flag', 
           [validation.InListValidation(["Y", "N"])], 
           allow_empty=True),
    Column('ratecodeid', 
           [validation.InListValidation([1, 2, 3, 4, 5, 6])], 
           allow_empty=True),
    Column('pulocationid',
           [validation.InRangeValidation(0, 266)],
           allow_empty=True),
    Column('dolocationid', 
           [validation.InRangeValidation(0, 266)],
           allow_empty=True),
    Column('passenger_count', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('trip_distance', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('fare_amount', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('extra', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('mta_tax', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('tip_amount', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('tolls_amount', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('ehail_fee', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('improvement_surcharge', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('total_amount', 
           [validation.InRangeValidation(min=0)], 
           allow_empty=True),
    Column('payment_type', 
           [validation.InListValidation([1, 2, 3, 4, 5, 6])], 
           allow_empty=True),
    Column('trip_type', 
           [validation.InListValidation([1, 2])], 
           allow_empty=True),
    Column('congestion_surcharge', 
           [validation.InRangeValidation(min=0)],
           allow_empty=True)
])

errors = schema_green.validate(df)

for error in errors:
    print(error)
