Utility Funcitons for loading and handling processed data

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import geopandas as gp
import pyspark.sql.functions as F
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [2]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [3]:
import pickle

In [4]:
sc = SparkContext('local[*]','temp')
sqlContext=SQLContext(sc)

22/07/26 12:15:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/26 12:15:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/07/26 12:15:32 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
dt=np.dtype([('id',np.ulonglong),('start year',np.short),('start month',np.byte),('start day',np.byte),('start hour',np.byte),('start minute',np.byte),('start julian',np.double),('end year',np.short),('end month',np.byte),('end day',np.byte),('end hour',np.byte),('end minute',np.byte),('trip seconds',np.uint32),('trip miles',np.single),('pickup census tract',np.ulonglong),('dropoff census tract',np.ulonglong),('pickup community area',np.byte),('dropoff community area',np.byte),('fare',np.single),('tip',np.single),('addcharge',np.single),('trip total',np.single),('st auth',np.byte),('pool',np.byte),('pickup lat',np.double),('pickup lon',np.double),('dropoff lat',np.double),('dropoff lon',np.double)])

In [6]:
def read_array(x):
    array=np.frombuffer(bytes(x),dtype=dt)
    return array.tolist()[0]

In [7]:
def read_in_range(start_year,start_month,end_year,end_month,path='/media/benjamin/Data/Chicago_Transit/TNP/'):
    dt=np.dtype([('id',np.ulonglong),('start year',np.short),('start month',np.byte),('start day',np.byte),('start hour',np.byte),('start minute',np.byte),('start julian',np.double),('end year',np.short),('end month',np.byte),('end day',np.byte),('end hour',np.byte),('end minute',np.byte),('trip seconds',np.uint32),('trip miles',np.single),('pickup census tract',np.ulonglong),('dropoff census tract',np.ulonglong),('pickup community area',np.byte),('dropoff community area',np.byte),('fare',np.single),('tip',np.single),('addcharge',np.single),('trip total',np.single),('st auth',np.byte),('pool',np.byte),('pickup lat',np.double),('pickup lon',np.double),('dropoff lat',np.double),('dropoff lon',np.double)])
    
    #Calculate month and year ranges
    nmonths=(end_year-start_year-1)*12.+(12.-start_month+1)+end_month
    yrs=(start_year+np.floor((start_month+np.arange(0,nmonths)-1)/12.)).astype(int)
    mos=(np.floor((np.mod(start_month+np.arange(nmonths)-1,12)+1 ))).astype(int)
    
    #construct list of file names
    fnames=''
    if nmonths > 1:
        for i in np.arange(0,nmonths,dtype=np.int16):
            fnames+=(path+"{0:04d}{1:02d}".format(yrs[i],mos[i])+'TNP.dat')
            if i != nmonths-1:
                fnames+=','
    else:
        fnames=path+"{0:04d}{1:02d}".format(yrs,mos)+'TNP.dat'
        
    #load binary records into rdd given record length
    rdd=sc.binaryRecords(fnames,104)
    out=rdd.map(read_array)
    df=out.toDF(['id','start year','start month','start day','start hour','start minute','start julian','end year','end month','end day','end hour','end minute','trip seconds','trip miles','pickup census tract','dropoff census tract','pickup community area','dropoff community area','fare','tip','addcharge','trip total','st auth','pool','pickup lat','pickup lon','dropaff lat','dropoff lon'])
    out.unpersist()
    return df


In [8]:
def area_out(df,ca,to_pandas=True):
    with_out=df.groupby(['start year','start month','start day','start hour','start julian','pickup community area']).count()
    out_area=with_out.where(with_out['pickup community area']==ca)
    out_area_sort=out_area.sort(['start year','start month','start day','start hour'])
    if to_pandas:
        outdf=out_area_sort.toPandas()
        outdf=outdf.rename(columns={'start year':'year','start month':'month','start day':'day','start hour':'hour'})
    else:
        outdf=out_area_sort.withColumnRenamed('start year','year').withColumnRenamed('start month','month').withColumnRenamed('start day','day').withColumnRenamed('start hour','hour')
    return outdf

In [9]:
def one_time_count(df,yr,mo,dy,hr):
        one_time=df.where((df['start year']==yr) & (df['start month']==mo) & (df['start day']==dy) & (df['start hour']==hr))
        cnt=one_time.groupby(['pickup community area']).count().toPandas()
        return cnt

In [10]:
def plot_chicago(counts):
    #expects geopandas dataframe with chicago boundary information
    #and counts as a pandas dataframe
    chicago=gp.read_file('/media/benjamin/Data/Chicago_Transit/Shapes/geo_export_43aec312-120e-4284-b1ca-b89761679d63.shp')
    chicago['pickup community area']=chicago['area_num_1'].astype(np.int32)
    with_count=chicago.merge(counts,on='pickup community area',how='left')
    fix, ax = plt.subplots(1,1) 
    with_count.plot(column='count',ax=ax,legend=True,legend_kwds={'label':'Pickup Count'})
    return with_count

In [11]:
def get_coefficients(counts):
    
    to_fit=counts.withColumn('twhrsine',F.sin(2*3.14159*F.col('start julian')*2.)).withColumn('twhrcos',F.cos(2*3.14159*F.col('start julian')*2.)).withColumn('daysine',F.sin(2*3.14159*F.col('start julian'))).withColumn('daycos',F.cos(2*3.14159*F.col('start julian'))).withColumn('weeksine',F.sin(2*3.14159*F.col('start julian')/7.)).withColumn('weekcos',F.cos(2*3.14159*F.col('start julian')/7.))
    va=VectorAssembler(inputCols = ['twhrsine','twhrcos','daysine','daycos','weeksine','weekcos'],outputCol = 'features')
    lr=LinearRegression(regParam=0.1)
    va_to_fit = va.transform(to_fit)
    final = va_to_fit.selectExpr(['features','count as label'])
    lr_model=lr.fit(final)
    return lr_model

    

In [None]:
def process_user_input(time):
    

In [12]:
df=read_in_range(2020,6,2022,3)

In [None]:
NbdModels={}
for i in np.arange(1,78):
    cnt=area_out(df,int(i),to_pandas=False)
    model=get_coefficients(cnt)
    NbdModels[i]=model
    print(i)

22/07/26 12:29:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/07/26 12:29:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/07/26 12:29:40 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/07/26 12:29:40 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

1


                                                                                

2


                                                                                

3


                                                                                

4


                                                                                

5


                                                                                

6


                                                                                

7


                                                                                

8


                                                                                

9


                                                                                

10


                                                                                

11


                                                                                

12


