In [107]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

In [82]:
import pandas as pd
import numpy as np

import gzip
import shutil
import json
import os

from pyspark import SparkContext, SparkConf
import re

from ineqpy import atkinson

## Reading different statistics from per taxi metrics

Adding `run_id` to the dictionary of results.

In [29]:
def add_run_id(fname, d, mode="per_taxi"):
    """
    Given the long filename from Spark, and a dict, this function
    cuts out the run_id from the filename, and adds it with a key
    `run_id` to the dictionary d.
    """
    if mode=="per_taxi":
        d["run_id"] = re.sub('^run_','',re.sub("_per_taxi_metrics.json.gz$",'',fname.split('/')[-1]))
    elif mode=="aggregates":
        d["run_id"] = re.sub('^run_','',re.sub("_aggregates.csv.gz$",'',fname.split('/')[-1]))
    return d

Here, we could calculate everything we need from the incomes, online ratios etc. at once.

In [3]:
def calc_atkinson(t):
    """
    Given a dictionary with the results, it calculates the Atkinson index
    of the incomes.
    """
    if "trip_avg_price" in t:
        return {"run_id" : t["run_id"], "atkinson" : atkinson(np.array(list(map(float,t["trip_avg_price"]))))}
    elif "trip_income" in t:
        return {"run_id" : t["run_id"], "atkinson" : atkinson(np.array(list(map(float,t["trip_income"]))))}
    else:
        return {"run_id" : t["run_id"], "atkinson" : None } 

In [4]:
# Spark settings
conf = SparkConf().set("spark.executor.memory", "20g")

# removing old output directory
shutil.rmtree("atkinson")

# initializing a SparkContext object
sc = SparkContext(conf=conf)

# defining a Spark RDD and saving the results into a textfile
sc.wholeTextFiles('results/run_2019*_per_taxi_metrics.json.gz')\
.map(lambda t: add_run_id(t[0],json.loads(t[1].split('\n')[-2])))\
.map(lambda t: calc_atkinson(t))\
.saveAsTextFile("atkinson")

# stopping the SparkContext
sc.stop()

In [100]:
# collecting results from Spark
atkinson_df  = pd.DataFrame.from_dict(
    [
        json.loads(l.strip('\n').replace('\'','\"')) \
         for f in os.listdir('atkinson/') \
         if 'part-'==f[0:5] \
         for l in open('atkinson/'+f).readlines()
    ]
)

## Reading the configuration data

In [45]:
configs = []
with gzip.open('configs/2019_all.conf.gz') as f:
    for line in f.readlines():
        configs.append(json.loads(line))

configs_df = pd.DataFrame.from_dict(configs)

## Reading the aggregate csv files

In [89]:
# removing old output directory
shutil.rmtree("temp_data/aggregates")

In [91]:
# initializing a SparkContext object
sc = SparkContext(conf=conf)

In [92]:
# defining a Spark RDD and saving the results into a textfile
sc.wholeTextFiles('results/run_2019*.csv.gz')\
.map(lambda t: add_run_id(t[0],pd.DataFrame.from_csv(t[0]).iloc[-1,].to_dict(),mode="aggregates"))\
.saveAsTextFile("temp_data/aggregates")

In [93]:
# stopping the SparkContext
sc.stop()

Somehow I could not find a solution in Python, so here is a first small preparation in bash.

In [94]:
%%bash 

cat aggregates/part* | sed 's/\x27/\"/g' | sed 's/nan/null/g' | grep -v '{79.0:' | jq -c '.' > aggregates.json

In [95]:
# collecting results from Spark
aggregates_df  = pd.DataFrame.from_dict(
    [
        json.loads(l.strip('\n')) for l in open('aggregates.json').readlines()
    ]
)

## Merging and cleaning the joined results

In [None]:
# setting run_ids as index for all three dataframes
configs_df['run_id'] = configs_df['run_id'].map(lambda s: re.sub('\.conf','',s))
configs_df.set_index('run_id',inplace=True)
atkinson_df.set_index('run_id',inplace=True)
aggregates_df.set_index('run_id',inplace=True)

In [103]:
# joining the different results
merged = configs_df.join(atkinson_df).join(aggregates_df,rsuffix="_2")

In [104]:
# dropping one row that belongs to the base config
merged.drop('2019_02_14_base',inplace=True)
# adding a geom column from the run_ids
merged["geom"]=merged.index.map(lambda s: re.sub(r'^.+geom_([0-9]).+$',r'\1',s))

In [111]:
def select_not_nan(x,y):
    """
    Out of two elements, returns the one that is not a pandas NaN.
    If both are NaNs, returns a NaN.
    """
    if pd.isnull(x):
        return y
    else:
        return x

In [117]:
# merging names from old and new code to one single column
for prefix in ['avg', 'std']:
    merged[prefix+'_trip_avg_price'] = merged[prefix+'_trip_avg_price'].combine(merged[prefix+'_trip_income'],select_not_nan)
    merged.drop(prefix+'_trip_income',axis=1,inplace=True)

In [118]:
# deleting unnecessary columns
merged.drop([
    "show_plot", 
    "show_map_labels", 
    "show_pending", 
    "avg_timestamp", 
    "std_timestamp",
    "0",
    "avg_request_lengths_2"
],axis=1,inplace=True)

In [119]:
# saving results
merged.to_csv('notebooks/results.csv.gz',compression='gzip')