In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.types import StructType, StructField, StringType, LongType, DateType, DoubleType, BooleanType, \
    ArrayType

In [40]:
spark = SparkSession.builder.appName("YelpHelp")\
    .master("local")\
    .config("spark.executor.memory", "16g")\
    .config("spark.driver.memory", "16g")\
    .getOrCreate()

schema2 = StructType([
    StructField("business_id", StringType(), True),
    StructField("date", StringType(), True)  
])


dataset2 = spark.read.json("../yelp_dataset/yelp_academic_dataset_checkin.json", schema=schema2)

In [41]:
import datetime
def convert(x):
    x = x.split(', ')
    return  x

def convert2(x):
    x = [str(datetime.datetime.strptime(i, '%Y-%m-%d %H:%M:%S').date())  for i in x]
    return x

dataset2 = dataset2.rdd.map(lambda x: (x[0], convert(x[1])))
dataset2 = dataset2.map(lambda x: (x[0], convert2(x[1])))
dataset2 = dataset2.toDF()

In [42]:
df_exploded = dataset2.withColumn('Checkin', func.explode('_2'))
df_exploded.show(5)

+--------------------+--------------------+----------+
|                  _1|                  _2|   Checkin|
+--------------------+--------------------+----------+
|--1UhMGODdWsrMast...|[2016-04-26, 2016...|2016-04-26|
|--1UhMGODdWsrMast...|[2016-04-26, 2016...|2016-08-30|
|--1UhMGODdWsrMast...|[2016-04-26, 2016...|2016-10-15|
|--1UhMGODdWsrMast...|[2016-04-26, 2016...|2016-11-18|
|--1UhMGODdWsrMast...|[2016-04-26, 2016...|2017-04-20|
+--------------------+--------------------+----------+
only showing top 5 rows



In [43]:
df_exploded = df_exploded.select(func.col("_1").alias("business_id"), func.col("Checkin").alias("checkin").cast(DateType()))

In [44]:
df_exploded.show(5)

+--------------------+----------+
|         business_id|   checkin|
+--------------------+----------+
|--1UhMGODdWsrMast...|2016-04-26|
|--1UhMGODdWsrMast...|2016-08-30|
|--1UhMGODdWsrMast...|2016-10-15|
|--1UhMGODdWsrMast...|2016-11-18|
|--1UhMGODdWsrMast...|2017-04-20|
+--------------------+----------+
only showing top 5 rows



In [46]:
df_exploded = df_exploded.withColumn('year', func.year("checkin")).repartition(10, "year")
df_exploded.write.partitionBy("year").json("../YelpDatasetYearly/CheckIns/yelp_academic_dataset_checkin")