# Imports and Initial Setup

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructField, StructType, DateType, TimestampType, StringType, IntegerType, FloatType
import pyspark.sql.functions as f
from PySparkAudit import counts, describe, freq_items, corr_matrix

In [2]:
sc = SparkContext('local')
spark = SparkSession(sc)

# Load Data

### NYC Citi Bike Trip Data (February 2016)

The data includes:

* Trip Duration (seconds)
* Start Time and Date
* Stop Time and Date
* Start Station Name
* End Station Name
* Station ID
* Station Lat/Long
* Bike ID
* User Type (Customer = 24-hour pass or 3-day pass user; Subscriber = Annual Member)
* Gender (0=unknown; 1=male; 2=female)
* Year of Birth

In [3]:
citibike_schema = StructType([
    StructField('Trip Duration', IntegerType(), nullable=True),
    StructField('Start Time', TimestampType(), nullable=True),
    StructField('Stop Time', TimestampType(), nullable=True),
    StructField('Start Station ID', IntegerType(), nullable=True),
    StructField('Start Station Name', StringType(), nullable=True),
    StructField('Start Station Latitude', FloatType(), nullable=True),
    StructField('Start Station Longtitude', FloatType(), nullable=True),
    StructField('End Station ID', IntegerType(), nullable=True),
    StructField('End Station Name', StringType(), nullable=True),
    StructField('End Station Latitude', FloatType(), nullable=True),
    StructField('End Station Longtitude', FloatType(), nullable=True),
    StructField('Bike ID', IntegerType(), nullable=True),
    StructField('User Type', StringType(), nullable=True),
    StructField('Birth Year', IntegerType(), nullable=True),
    StructField('Gender', IntegerType(), nullable=True)
])

In [4]:
citibike_df = spark.read.csv('JC-20162-citibike-tripdata.csv', header='true', schema=citibike_schema, timestampFormat="yyyy-MM-dd HH:mm:ss", ignoreLeadingWhiteSpace='true', ignoreTrailingWhiteSpace='true')

In [5]:
citibike_df.limit(5).toPandas()

Unnamed: 0,Trip Duration,Start Time,Stop Time,Start Station ID,Start Station Name,Start Station Latitude,Start Station Longtitude,End Station ID,End Station Name,End Station Latitude,End Station Longtitude,Bike ID,User Type,Birth Year,Gender
0,361,2016-02-01 00:31:18,2016-02-01 00:37:19,3202,Newport PATH,40.727222,-74.03376,3203,Hamilton Park,40.727596,-74.04425,24393,Subscriber,1975,1
1,297,2016-02-01 01:55:05,2016-02-01 02:00:02,3195,Sip Ave,40.730743,-74.063782,3194,McGinley Square,40.725342,-74.067619,24394,Subscriber,1985,2
2,1155,2016-02-01 02:40:05,2016-02-01 02:59:20,3183,Exchange Place,40.716248,-74.033463,3210,Pershing Field,40.742676,-74.051788,24676,Subscriber,1976,1
3,1769,2016-02-01 05:11:28,2016-02-01 05:40:58,3214,Essex Light Rail,40.712772,-74.036484,3203,Hamilton Park,40.727596,-74.04425,24700,Subscriber,1974,2
4,935,2016-02-01 05:48:24,2016-02-01 06:03:59,3203,Hamilton Park,40.727596,-74.04425,3214,Essex Light Rail,40.712772,-74.036484,24639,Subscriber,1974,2


### NYC Weather Data (2016)

The data includes:

* Daily maximum, minimum and average temperature (degrees F)
* Daily precipitation, snowfall and snow depth (inches; T means trace, not enough for a value)

In [6]:
weather_schema = StructType([
    StructField('date', DateType(), nullable=True),
    StructField('maximum temperature', IntegerType(), nullable=True),
    StructField('minimum temperature', IntegerType(), nullable=True),
    StructField('average temperature', FloatType(), nullable=True),
    StructField('precipitation', StringType(), nullable=True),
    StructField('snow fall', StringType(), nullable=True),
    StructField('snow depth', StringType(), nullable=True)
])

In [7]:
weather_df = spark.read.csv('weather_data_nyc_centralpark_2016(1).csv', header='true', schema=weather_schema, dateFormat='dd-MM-yyyy', ignoreLeadingWhiteSpace='true', ignoreTrailingWhiteSpace='true')

In [8]:
weather_df.limit(5).toPandas()

Unnamed: 0,date,maximum temperature,minimum temperature,average temperature,precipitation,snow fall,snow depth
0,2016-01-01,42,34,38.0,0.0,0.0,0
1,2016-01-02,40,32,36.0,0.0,0.0,0
2,2016-01-03,45,35,40.0,0.0,0.0,0
3,2016-01-04,36,14,25.0,0.0,0.0,0
4,2016-01-05,29,11,20.0,0.0,0.0,0


# Data Checks and Validation

### NYC Citi Bike Trip Data (February 2016)

Check if User Type has values Subscriber or Customer only:

In [9]:
citibike_df.where((f.col('User Type') != 'Subscriber') & (f.col('User Type') != 'Customer')).count() == 0

True

Check if Gender has values 0, 1 or 2 only:

In [10]:
citibike_df.where((f.col('Gender') != 0) & (f.col('Gender') != 1) & (f.col('Gender') != 2)).count() == 0

True

Check if all data is just for February 2016:

In [11]:
citibike_df.where(~(f.col('Start Time').like('2016-02%') | f.col('Stop Time').like('2016-02%'))).count() == 0

True

### NYC Weather Data (2016)

Check if all data is just from 2016:

In [12]:
weather_df.where(~f.col('date').like('2016%')).count() == 0

True

# Data Prep

### NYC Citi Bike Trip Data (February 2016)

Split Start Time and Stop Time into Start Day and Start Time and Stop Day and Stop Time:

In [13]:
split_start = f.split(citibike_df['Start Time'], ' ')
split_stop = f.split(citibike_df['Stop Time'], ' ')

In [14]:
citibike_df = citibike_df.withColumn('Start Day', split_start.getItem(0))
citibike_df = citibike_df.withColumn('Start Time', split_start.getItem(1))
citibike_df = citibike_df.withColumn('Stop Day', split_stop.getItem(0))
citibike_df = citibike_df.withColumn('Stop Time', split_stop.getItem(1))

In [15]:
citibike_df.limit(5).toPandas()

Unnamed: 0,Trip Duration,Start Time,Stop Time,Start Station ID,Start Station Name,Start Station Latitude,Start Station Longtitude,End Station ID,End Station Name,End Station Latitude,End Station Longtitude,Bike ID,User Type,Birth Year,Gender,Start Day,Stop Day
0,361,00:31:18,00:37:19,3202,Newport PATH,40.727222,-74.03376,3203,Hamilton Park,40.727596,-74.04425,24393,Subscriber,1975,1,2016-02-01,2016-02-01
1,297,01:55:05,02:00:02,3195,Sip Ave,40.730743,-74.063782,3194,McGinley Square,40.725342,-74.067619,24394,Subscriber,1985,2,2016-02-01,2016-02-01
2,1155,02:40:05,02:59:20,3183,Exchange Place,40.716248,-74.033463,3210,Pershing Field,40.742676,-74.051788,24676,Subscriber,1976,1,2016-02-01,2016-02-01
3,1769,05:11:28,05:40:58,3214,Essex Light Rail,40.712772,-74.036484,3203,Hamilton Park,40.727596,-74.04425,24700,Subscriber,1974,2,2016-02-01,2016-02-01
4,935,05:48:24,06:03:59,3203,Hamilton Park,40.727596,-74.04425,3214,Essex Light Rail,40.712772,-74.036484,24639,Subscriber,1974,2,2016-02-01,2016-02-01


### NYC Weather Data (2016)

Filter weather data to include records only for February 2016:

In [16]:
weather_df = weather_df.where(f.col('date').between('2016-02-01', '2016-02-29'))

In [17]:
weather_df.count() == 29

True

In [18]:
weather_df.limit(5).toPandas()

Unnamed: 0,date,maximum temperature,minimum temperature,average temperature,precipitation,snow fall,snow depth
0,2016-02-01,59,44,51.5,0.01,0.0,2
1,2016-02-02,50,38,44.0,0.00,0.0,T
2,2016-02-03,59,42,50.5,0.73,0.0,0
3,2016-02-04,59,44,51.5,T,0.0,0
4,2016-02-05,44,31,37.5,0.53,2.5,1


0.00 < T < 0.01. Let's round up Ts to 0.01 so that the last three columns are all numeric:

In [19]:
weather_df = weather_df.withColumn('precipitation', f.when(f.col('precipitation') == 'T', 0.01).otherwise(f.col('precipitation')).cast(FloatType()))
weather_df = weather_df.withColumn('snow fall', f.when(f.col('snow fall') == 'T', 0.01).otherwise(f.col('snow fall')).cast(FloatType()))
weather_df = weather_df.withColumn('snow depth', f.when(f.col('snow depth') == 'T', 0.01).otherwise(f.col('snow depth')).cast(IntegerType()))

In [20]:
weather_df.limit(5).toPandas()

Unnamed: 0,date,maximum temperature,minimum temperature,average temperature,precipitation,snow fall,snow depth
0,2016-02-01,59,44,51.5,0.01,0.0,2
1,2016-02-02,50,38,44.0,0.0,0.0,0
2,2016-02-03,59,42,50.5,0.73,0.0,0
3,2016-02-04,59,44,51.5,0.01,0.0,0
4,2016-02-05,44,31,37.5,0.53,2.5,1


# Join The Two Datasets

In [21]:
full_df = citibike_df.join(weather_df, citibike_df['Start Day'] == weather_df['date'], how='inner').drop('date')

In [22]:
full_df.limit(5).toPandas()

Unnamed: 0,Trip Duration,Start Time,Stop Time,Start Station ID,Start Station Name,Start Station Latitude,Start Station Longtitude,End Station ID,End Station Name,End Station Latitude,...,Birth Year,Gender,Start Day,Stop Day,maximum temperature,minimum temperature,average temperature,precipitation,snow fall,snow depth
0,361,00:31:18,00:37:19,3202,Newport PATH,40.727222,-74.03376,3203,Hamilton Park,40.727596,...,1975,1,2016-02-01,2016-02-01,59,44,51.5,0.01,0.0,2
1,297,01:55:05,02:00:02,3195,Sip Ave,40.730743,-74.063782,3194,McGinley Square,40.725342,...,1985,2,2016-02-01,2016-02-01,59,44,51.5,0.01,0.0,2
2,1155,02:40:05,02:59:20,3183,Exchange Place,40.716248,-74.033463,3210,Pershing Field,40.742676,...,1976,1,2016-02-01,2016-02-01,59,44,51.5,0.01,0.0,2
3,1769,05:11:28,05:40:58,3214,Essex Light Rail,40.712772,-74.036484,3203,Hamilton Park,40.727596,...,1974,2,2016-02-01,2016-02-01,59,44,51.5,0.01,0.0,2
4,935,05:48:24,06:03:59,3203,Hamilton Park,40.727596,-74.04425,3214,Essex Light Rail,40.712772,...,1974,2,2016-02-01,2016-02-01,59,44,51.5,0.01,0.0,2


# Analysis and Visualization

Count notnull and distinct values:

In [23]:
counts(full_df)

Unnamed: 0,feature,row_count,notnull_count,distinct_count
0,Trip Duration,8250,8250,1571
1,Start Time,8250,8250,7586
2,Stop Time,8250,8250,7587
3,Start Station ID,8250,8250,35
4,Start Station Name,8250,8250,35
5,Start Station Latitude,8250,8250,35
6,Start Station Longtitude,8250,8250,35
7,End Station ID,8250,8250,39
8,End Station Name,8250,8250,39
9,End Station Latitude,8250,8250,39


Calculate distinct count, min, mean, max and standard deviation for columns for which this information is valuable:

In [24]:
describe(full_df[['Trip Duration', 'Start Time', 'Stop Time', 'Start Day', 'Stop Day', 'Birth Year', 'maximum temperature', 'minimum temperature', 'average temperature', 'precipitation', 'snow fall', 'snow depth']])

summary,count,mean,stddev,min,max
feature,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Trip Duration,8250,1424.132,40051.95339126977,61,2104123
Start Time,8250,,,00:01:01,23:59:41
Stop Time,8250,,,00:01:21,23:59:18
Start Day,8250,,,2016-02-01,2016-02-29
Stop Day,8250,,,2016-02-01,2016-03-07
Birth Year,7920,1978.87297979798,9.535096889682688,1947,1999
maximum temperature,8250,47.38363636363636,11.041830727297498,15,61
minimum temperature,8250,33.46787878787879,8.927544634661732,-1,47
average temperature,8250,40.42575757575757,9.706067981896194,7.0,54.0
precipitation,8250,0.1107757582957997,0.2734341007857553,0.0,1.22


Find most popular values and the number of times they occur:

In [25]:
freq_items(full_df)

Unnamed: 0,feature,"freq_items[value, freq]"
0,Trip Duration,"[[305, 32], [150, 30], [242, 29], [245, 27], [..."
1,Start Time,"[[17:34:19, 4], [17:52:32, 4], [09:02:53, 3], ..."
2,Stop Time,"[[09:12:47, 4], [13:04:29, 4], [09:07:34, 4], ..."
3,Start Station ID,"[[3186, 998], [3195, 717], [3203, 682], [3209,..."
4,Start Station Name,"[[Grove St PATH, 998], [Sip Ave, 717], [Hamilt..."
5,Start Station Latitude,"[[40.71958541870117, 998], [40.730743408203125..."
6,Start Station Longtitude,"[[-74.0431137084961, 998], [-74.06378173828125..."
7,End Station ID,"[[3186, 1440], [3195, 661], [3203, 624], [3183..."
8,End Station Name,"[[Grove St PATH, 1440], [Sip Ave, 661], [Hamil..."
9,End Station Latitude,"[[40.71958541870117, 1440], [40.73074340820312..."


Calculate correlations between variables:

In [26]:
corr_matrix(full_df)

The correlation matrix plot Corr.png was located at:
/home/maria/Kaufland IT Hub/Audited


Unnamed: 0,Trip Duration,Start Station ID,Start Station Latitude,Start Station Longtitude,End Station ID,End Station Latitude,End Station Longtitude,Bike ID,Birth Year,Gender,maximum temperature,minimum temperature,average temperature,precipitation,snow fall,snow depth
Trip Duration,1.0,-0.010974,-0.023682,0.022613,-0.00326,-0.01998,0.01881,0.011278,-0.007287,0.007325,-0.034607,-0.038832,-0.037542,-0.0079,-0.003725,-0.00525
Start Station ID,-0.010974,1.0,0.304295,-0.174111,-0.003534,0.052183,0.042294,-0.011723,-0.001169,-0.008885,-0.010285,-0.010587,-0.010718,-0.006927,-0.020095,-0.01799
Start Station Latitude,-0.023682,0.304295,1.0,-0.27767,0.033522,0.474659,-0.285721,-0.001922,-0.018184,-0.080182,-0.004869,-0.001759,-0.003576,0.008255,-0.00497,0.01052
Start Station Longtitude,0.022613,-0.174111,-0.27767,1.0,-0.026737,-0.296585,0.577937,0.004986,-0.007831,0.044079,0.026674,0.030763,0.02932,-0.010111,0.003085,0.023392
End Station ID,-0.00326,-0.003534,0.033522,-0.026737,1.0,0.063775,-0.108479,-0.003683,-0.005745,0.013348,0.006642,0.003715,0.005484,-0.000807,-0.024681,-0.004568
End Station Latitude,-0.01998,0.052183,0.474659,-0.296585,0.063775,1.0,-0.304102,-0.001966,-0.037745,-0.071937,-0.015206,-0.015288,-0.015678,-0.003269,0.009392,0.005592
End Station Longtitude,0.01881,0.042294,-0.285721,0.577937,-0.108479,-0.304102,1.0,-0.002618,0.003584,0.047291,0.016503,0.01918,0.018208,-0.013085,-0.005575,0.008409
Bike ID,0.011278,-0.011723,-0.001922,0.004986,-0.003683,-0.001966,-0.002618,1.0,0.002476,-0.009654,-0.005124,-0.013448,-0.009104,0.012072,0.018574,0.013145
Birth Year,-0.007287,-0.001169,-0.018184,-0.007831,-0.005745,-0.037745,0.003584,0.002476,1.0,0.082169,0.007298,-0.000337,0.00399,-0.005175,0.004706,-0.008337
Gender,0.007325,-0.008885,-0.080182,0.044079,0.013348,-0.071937,0.047291,-0.009654,0.082169,1.0,0.022724,0.021069,0.022611,-0.034632,-0.009821,0.014713


Check most popular hours to take a (long) trip:

In [27]:
(full_df
     .groupby(f.hour('Start Time').alias('Hour'))
     .agg(f.sum('Trip Duration').alias('Total Trip Duration'),
          f.count(f.lit(1)).alias('Total Trips'))
     .sort(f.desc('Total Trip Duration'))
).toPandas()

Unnamed: 0,Hour,Total Trip Duration,Total Trips
0,8,2644987,1168
1,16,2384178,434
2,7,2379639,639
3,18,486769,792
4,17,427550,686
5,10,423564,345
6,14,381342,332
7,11,375820,341
8,12,374920,343
9,15,317541,343


Check the most popular days of the week to take a (long) trip:
* 1 - Sunday
* 2 - Monday
* 3 - Tuesday
* 4 - Wednesday
* 5 - Thursday
* 6 - Friday
* 7 - Saturday

In [28]:
(full_df
     .groupby(f.dayofweek('Start Day').alias('Day of Week'))
     .agg(f.sum('Trip Duration').alias('Total Trip Duration'),
          f.count(f.lit(1)).alias('Total Trips'))
     .sort(f.desc('Total Trip Duration'))
).toPandas()

Unnamed: 0,Day of Week,Total Trip Duration,Total Trips
0,6,6871518,1101
1,1,1066491,1014
2,2,1001266,1591
3,4,811267,1095
4,5,688964,1358
5,7,675221,866
6,3,634362,1225


Check the days of the month with longest/most trips:

In [29]:
(full_df
     .groupby(f.dayofmonth('Start Day').alias('Day of Month'))
     .agg(f.sum('Trip Duration').alias('Total Trip Duration'),
          f.count(f.lit(1)).alias('Total Trips'))
     .sort(f.desc('Total Trip Duration'))
).toPandas()

Unnamed: 0,Day of Month,Total Trip Duration,Total Trips
0,12,6426900,249
1,28,598657,417
2,17,452647,353
3,20,355429,342
4,22,342570,437
5,29,280572,405
6,21,269286,304
7,25,206130,372
8,18,198425,340
9,16,190727,222
