In [None]:
## Run all the cells

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession \
    .builder \
    .appName("test32") \
    .getOrCreate()

In [None]:
randomData = spark.read.csv('myFile0-1.csv', header='true', inferSchema = True)

In [None]:
randomData.createOrReplaceTempView('stats')

In [None]:
spark.sql("""SELECT * FROM stats""").show()

In [None]:
"""
SQL QUERY to get to display the records which have 3 or more consecutive events and the amount of people more than 100(
"""
spark.sql(""" 
SELECT DISTINCT( event_name ),
               Max(people_count_sum)
FROM   (SELECT *
        FROM   (SELECT *,
                       Sum(people_count)
                         OVER (
                           partition BY event_name
                           ORDER BY id) AS people_count_sum
                FROM   (SELECT *,
                               Rank()
                                 OVER (
                                   partition BY event_name
                                   ORDER BY id) AS cont_occurance
                        FROM   stats
                        ORDER  BY id)
                ORDER  BY id)
        WHERE  cont_occurance >= 3
               AND people_count_sum >= 100)
GROUP  BY event_name  
""").show(100)

In [None]:
df = spark.read.csv('data.csv', header='true', inferSchema = True)

In [None]:
df.printSchema

In [None]:
df.createOrReplaceTempView('VermontVendor')

In [None]:
spark.sql(
'''select * from VermontVendor limit 10'''
).show()

In [None]:
from pyspark.sql.types import FloatType,StringType
from pyspark.sql.functions import udf,col
import datetime

def int_to_string(x):
    return str(x)

def datetime_formatter(dateStr):
    import datetime
    return (str(datetime.datetime.strptime(dateStr, "%m/%d/%y %H:%M").strftime("%Y-%m-%dT%H:%M:%S.%fZ")))
    
toStr = udf(lambda z: int_to_string(z), StringType())
parseDatetime = udf(lambda z: datetime_formatter(z), StringType())

In [None]:
modifieddf = df.filter(col("Person Id").isNotNull()) \
               .filter(col("Floor Access DateTime").isNotNull()) \
               .filter(col("Floor Level").isNotNull()) \
               .filter(col("Building").isNotNull()) \
               .select(col("Floor Level").alias("floor_level"),
                       col("Building").alias("building"),
                       toStr("Person Id").alias("person_id"),
                       parseDatetime("Floor Access DateTime").alias("datetime")
                      )

In [None]:
modifieddf.printSchema

In [None]:
modifieddf.show()

In [None]:
modifieddf.write.mode("overwrite")\
    .format('json') \
    .save('output_path')