# Motivation
Create a notebook in which I have useful pyspark demos based on teh code that I have written over time. Nothing fancy but useful helper functions

In [1]:
# pyspark stuff
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.window import Window

#other libraries
import datetime
import os
import time
import numpy as np
import pandas as pd

In [2]:
#initialize spark context
session_name = 'geo:{}:oneway_detection'.format(os.environ['JUPYTERHUB_USER'])
  
#----create a spark session with configurations
spark = SparkSession.builder\
    .appName(session_name) \
    .config('spark.cores.max', 32)\
    .enableHiveSupport() \
    .getOrCreate()
#print spark ui weburl
spark.sparkContext.uiWebUrl

'http://100.108.215.151:4040'

## Create dummy dataframe

In [3]:
data = spark.createDataFrame([Row(zip_code='58542', dma='MIN'),
 Row(zip_code='58701', dma='MIN'),
 Row(zip_code='57632', dma='MAX'),
 Row(zip_code='58734', dma='MAX')])

data.show()

+---+--------+
|dma|zip_code|
+---+--------+
|MIN|   58542|
|MIN|   58701|
|MAX|   57632|
|MAX|   58734|
+---+--------+



In [7]:
data.printSchema()

root
 |-- dma: string (nullable = true)
 |-- zip_code: string (nullable = true)



## UDFs

In [4]:
# return multiple values from a pandas UDF
schema = StructType([
    StructField("roadname", StringType(), False),
    StructField("roadtype", StringType(), False),
    StructField("lanes", StringType(), False),
    StructField("is_oneway", StringType(), False),
    StructField("is_tunnel", StringType(), False)
])

"""
Note that tags is map data type
"""
def get_meta_data(tags):
    #return tags.get("name")
    return Row('roadname', 'roadtype','lanes','is_oneway','is_tunnel')(tags.get("name"), tags.get("highway"),tags.get("lanes"),tags.get("oneway"),tags.get("tunnel"))
udf_metadata = udf(f=get_meta_data,returnType= schema)   

In [5]:
# another udf where I am returning an array of arrays
#Get the segment order
def get_order(nodeids):
    prev=None
    segments=[]
    for node in nodeids:
        if prev==None:
            prev=node
            continue
        else:
            segments.append([prev,node])
            prev=node
    return segments

udf_get_order = udf(get_order, ArrayType(ArrayType(LongType()))) 

In [6]:
#always ensure the type of the data that is being returned do not assume
#this bloody python and not statically typed language like Java, C++. before returning something
#verify else you will burn like I spent 30 minutes debugging this below.

from scipy.stats import normaltest
def normality_test(x):
    return float(normaltest(x)[1])

udf_normality_test = udf(f=normality_test,returnType= FloatType()) 

## Create external table in hive

In [None]:
sql = '''
    CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (
        prev_way_id long,
        way_id long,
        turn_count int,
        city_id int,
        version STRING
    )
    partitioned by (
        date date,
        hour int
        
    )
    STORED AS PARQUET
    LOCATION 's3a://{bucket_name}/{prefix}/'
    '''.format(
        table_name=table_name,
        bucket_name=bucket_name,
        prefix=prefix
    )

_ = spark.sql(sql)

### Write dataframe to hive

In [None]:
import time
year=2018
month=11
city_id=10
map_version='v1.1'

spark.conf.set("spark.sql.session.timeZone", "Etc/GMT")

for day in range(1,7):
    date='{}-{:02d}-{:02d}'.format(year,month,day)
    for hour in range(0,24):
        start_time = time.time()
        read_path='s3a://{prefix_name}/{bucket_name}/year={}/month={:02d}/day={:02d}/hour={:02d}'\
        .format(year,month,day,hour)
        
        path_exists=True
        try:
            df=spark.read.parquet(read_path)
        except Exception as e:
            path_exists= False
    
        if path_exists==False:
            print("speed data does not exist for ",date, " for city ",city_id, " and hour ",hour)
            continue
        
        
        df=df.select("col1","col2","col3", F.col("qwerty_xyz").cast("long").alias("col4"),"time_stamp")
        
        #some time stamp related stuff
        df=df.withColumn("ts",F.from_unixtime("time_stamp")).withColumn("day_of_week",F.date_format("ts","u"))
        
        
        df=df.orderBy("col1", "time_stamp")
        col1_counts=df.groupBy("col1").agg(F.count("time_stamp").alias('num_pings'))
        df=df.join(col1_counts,(df.BookingCode==col1_counts.BookingCode)).drop(col1_counts.col1)
       
        
        
        #windows for transition
        wspec=Window.partitionBy('col1').orderBy("time_stamp")
        df=df.withColumn("prev_col2",F.lag(df.col2, 1).over(wspec))
        df=df.select(['col1','prev_col2','col2']).distinct()
        df=df.filter("prev_col2 is not NULL")
        df=df.filter(df.col2!=df.prev_col2)
        
        transition_count=df.groupBy('prev_col2','col2').agg(F.count('col1').alias('turn_count'))
        transition_count = transition_count.withColumn("city_id", F.lit(city_id))
        transition_count = transition_count.withColumn("version", F.lit(version))
        transition_count = transition_count.withColumn("date", F.lit(date))
        transition_count = transition_count.withColumn("hour", F.lit(hour))
        s3path="s3a://{bucket_name}/{prefix}/date={date}/".format(bucket_name=bucket_name, prefix=prefix,date=date)
        
        #transition_count.show(10)
        #print(transition_count.schema)
        transition_count.cache()
        transition_count.repartition('date','hour').write.partitionBy('hour').mode('append').format('parquet').save(s3path)
        print("updating table partitions now at path ",s3path)

        hour_path=s3path+"hour={}/".format(hour)
        sql='''ALTER TABLE {table_name} ADD IF NOT EXISTS PARTITION (date='{date}',hour={hour})
        LOCATION '{hour_path}' '''.format(table_name=table_name,date=date,hour=hour,hour_path=hour_path)
        _ = spark.sql(sql)
        print('finished hour ',hour, 'for date ',date)
        transition_count.unpersist()
        print("--- %s seconds ---" % (time.time() - start_time))
