# Chapter 3

If you've used `git lfs` to download the sample data files, you'll have pulled down the sliced.parquet file, run the next cell to generate the csv file from it before moving on!

If you already have the `sliced.csv` file, you can skip this cell

In [1]:
import pyarrow as pa
import pyarrow.csv
import pyarrow.parquet as pq

# write out csv version of the parquet file that is provided in the repo
sliced = pq.read_table('../sample_data/sliced.parquet')
pa.csv.write_csv(sliced, '../sample_data/sliced.csv')

In [2]:
import pyspark as pyspark
from pyspark import SparkContext
conf = pyspark.SparkConf()
conf.set("spark.executor.memory", "8g")
conf.set("spark.driver.memory", "8g")
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1')
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')
sc._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')

from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [3]:
%%time
import pyarrow as pa
import pyarrow.csv
import pyarrow.parquet as pq

pdf = pa.csv.read_csv('../sample_data/sliced.csv').to_pandas()

CPU times: user 4.82 s, sys: 3.61 s, total: 8.43 s
Wall time: 801 ms


In [4]:
%time df = spark.read.format('csv').load('../sample_data/sliced.csv', inferSchema='true', header='true')

CPU times: user 0 ns, sys: 151 ms, total: 151 ms
Wall time: 6.34 s


In [5]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [6]:
%time df = spark.createDataFrame(pa.csv.read_csv('../sample_data/sliced.csv').to_pandas())

CPU times: user 5.52 s, sys: 4.11 s, total: 9.63 s
Wall time: 2.47 s


In [7]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [8]:
%%time
df = spark.read.format('parquet').load('../sample_data/sliced.parquet') # using pyspark native reader
df.describe().show()

+-------+-------------------+------------------+-----------------+-------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+------------------+---------------------+------------------+
|summary|           VendorID|   passenger_count|    trip_distance|   pickup_longitude|  pickup_latitude|        RateCodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|      payment_type|       fare_amount|             extra|             mta_tax|        tip_amount|      tolls_amount|improvement_surcharge|      total_amount|
+-------+-------------------+------------------+-----------------+-------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+------------------+---------------------+------

In [9]:
%%time
df = spark.createDataFrame(pq.read_table('../sample_data/sliced.parquet').to_pandas()) # using pyarrow
df.describe().show()

+-------+-------------------+------------------+-----------------+-------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+
|summary|           VendorID|   passenger_count|    trip_distance|   pickup_longitude|  pickup_latitude|        RateCodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|      payment_type|      fare_amount|             extra|            mta_tax|        tip_amount|       tolls_amount|improvement_surcharge|      total_amount|
+-------+-------------------+------------------+-----------------+-------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+------------------+-------------------+---------------------+---------

In [10]:
%%timeit 
df = spark.read.format('csv').load(
    '../sample_data/sliced.csv', 
    inferSchema='true', 
    header='true').select('VendorID', 
                          'tpep_pickup_datetime', 
                          'passenger_count', 
                          'tip_amount', 
                          'fare_amount', 
                          'total_amount')

2.06 s ± 66.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
%%time
df = spark.createDataFrame(pa.csv.read_csv('../sample_data/sliced.csv',
               convert_options=pa.csv.ConvertOptions(
                include_columns=['VendorID', 'tpep_pickup_datetime',
                                 'passenger_count', 'tip_amount',
                                 'fare_amount', 'total_amount'])
               ).to_pandas())

CPU times: user 3.24 s, sys: 2.48 s, total: 5.72 s
Wall time: 2.07 s


In [12]:
import pandas as pd
from pyspark.sql.functions import * 
# import the functions we want to use like 'month', 'to_timestamp'
# and 'col'. Very useful.

df = df.withColumn('tpep_pickup_datetime',
                   to_timestamp(col('tpep_pickup_datetime'), 
                  'yyyy-MM-dd HH:mm:ss')) # the datetime format
df = df.withColumn('pickup_month', 
                   month(col('tpep_pickup_datetime')))


In [13]:
%%time

from pyspark.sql.types import * 

group_columns = ['VendorID', 'pickup_month']
non_group_cols = [col for col in df.columns if col not in group_columns]
s = StructType([f for f in df.schema.fields if f.name in non_group_cols])
cols = list([col(name) for name in non_group_cols])

df_norm = df.withColumn('values', struct(*cols))
df_norm = (df_norm.groupBy(*group_columns).agg(collect_list(df_norm.values).alias('values')))

s2 = StructType(s.fields + [StructField('v3', DoubleType())])

@udf(ArrayType(s2))
def normalize(values):
    v1 = pd.Series([r.tip_amount for r in values])
    v1_norm = (v1 - v1.mean())/v1.std()
    return [values[i] + (float(v1_norm[i]),) for i in range(0, len(values))]

df_norm = (df_norm.withColumn('new_values', normalize(df_norm.values)).drop('values').withColumn('new_values', explode(col('new_values'))))

for c in [f.name for f in s2.fields]:
    df_norm = df_norm.withColumn(c, col('new_values.{0}'.format(c)))
    
df_norm = df_norm.drop('new_values')
df_norm.show()


+--------+------------+--------------------+---------------+----------+-----------+------------+--------------------+
|VendorID|pickup_month|tpep_pickup_datetime|passenger_count|tip_amount|fare_amount|total_amount|                  v3|
+--------+------------+--------------------+---------------+----------+-----------+------------+--------------------+
|       1|           1| 2015-01-10 20:33:38|              1|       2.0|       14.5|        17.8| 0.18341020244403536|
|       1|           1| 2015-01-10 20:33:38|              1|       0.0|        9.5|        10.8| -0.6349579704225042|
|       1|           1| 2015-01-10 20:33:39|              1|       0.0|        3.5|         4.8| -0.6349579704225042|
|       1|           1| 2015-01-10 20:33:39|              1|       0.0|       15.0|        16.3| -0.6349579704225042|
|       1|           1| 2015-01-10 20:33:39|              1|       6.7|       27.0|       40.33|  2.1065754086804036|
|       1|           1| 2015-01-10 20:33:39|            

In [14]:
%%time

schema = StructType(df.schema.fields + [StructField('v3', DoubleType())])

def vector_normalize(values):
    v1 = values.tip_amount
    values['v3'] = (v1 - v1.mean())/v1.std()
    return values
    
group_columns = ['VendorID', 'pickup_month']
df_pandas_norm = df.groupby(*group_columns).applyInPandas(vector_normalize, schema=schema)
df_pandas_norm.show()


+--------+--------------------+---------------+----------+-----------+------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|passenger_count|tip_amount|fare_amount|total_amount|pickup_month|                  v3|
+--------+--------------------+---------------+----------+-----------+------------+------------+--------------------+
|       1| 2015-01-10 20:33:38|              1|       2.0|       14.5|        17.8|           1| 0.18341020244403536|
|       1| 2015-01-10 20:33:38|              1|       0.0|        9.5|        10.8|           1| -0.6349579704225042|
|       1| 2015-01-10 20:33:39|              1|       0.0|        3.5|         4.8|           1| -0.6349579704225042|
|       1| 2015-01-10 20:33:39|              1|       0.0|       15.0|        16.3|           1| -0.6349579704225042|
|       1| 2015-01-10 20:33:39|              1|       6.7|       27.0|       40.33|           1|  2.1065754086804036|
|       1| 2015-01-10 20:33:39|              1|       0.

## IMPORTANT

Run the next cell before loading the `perspective.ipynb` notebook. You may need to refresh this window after the cell runs before the perspective widget will load.

You also might need to click the puzzle piece on the sidebar and make sure you click the `Enable` button to enable third-party jupyter extensions in order to see the widgets.

In [15]:
!pip install ipywidgets
!pip install perspective-python
!jupyter labextension install @jupyter-widgets/jupyterlab-manager
!jupyter labextension install @finos/perspective-jupyterlab
!jupyter nbextension enable --py widgetsnbextension --sys-prefix

Building jupyterlab assets (production, minimized)
Building jupyterlab assets (production, minimized)
