<a href="https://colab.research.google.com/github/dubeyabhi07/big-data-spark/blob/master/src/main/pyspark/event/complexToSimple.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [0]:
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/e9/e4/5c15ab8d354c4e3528510821865e6748209a9b0ff6a1788f4cd36cc2a5dc/pyspark-2.4.6.tar.gz (218.4MB)
[K     |████████████████████████████████| 218.4MB 55kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 42.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.6-py2.py3-none-any.whl size=218814406 sha256=9297d7eb2718d9f19bcae4c29d402660a227b549dfa8b80fb026debf70dc5915
  Stored in directory: /root/.cache/pip/wheels/1e/5e/6a/17e906c94ec7246f260330a66e44a06a0809033ba2738a74a8
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.6


In [0]:
from pyspark.sql.functions import *


'''
 processing schedule.csv (was saved in ComplexToSimple.scala)

'''

eventJsonDf = spark.read.json("event.json", multiLine = True)

eventJsonDf.printSchema()

root
 |-- event_data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- event_id: string (nullable = true)
 |    |    |-- reserved: struct (nullable = true)
 |    |    |    |-- confirmed: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- address: string (nullable = true)
 |    |    |    |    |    |-- city: string (nullable = true)
 |    |    |    |    |    |-- slots: long (nullable = true)
 |    |    |    |-- waitlist: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- city: string (nullable = true)
 |    |    |    |    |    |-- slots: long (nullable = true)
 |    |    |-- schedule: struct (nullable = true)
 |    |    |    |-- Bangalore: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- cost: string (nullable = true)
 |    |    |    |    |    |-- date: 

In [0]:
 #removing the cover, retrieving data and converting the outer fields into columns
df = eventJsonDf.select(explode(col("event_data")).alias("temp_field"))\
      .select(col("temp_field.*"))
df.show(5)
df.printSchema()

+--------------------+--------------------+--------------------+
|            event_id|            reserved|            schedule|
+--------------------+--------------------+--------------------+
|c8a478e8-cfdc-466...|[[[4290 Hayes Ter...|[[[$5.51, 04/12/2...|
|37217a8c-18ab-4b8...|[[[713 Waxwing Pl...|[[[$9.41, 11/02/2...|
|4c04d44e-1bd7-471...|[[[64 Sunnyside C...|[[[$0.71, 06/14/2...|
|b2f5620e-e756-411...|[[[4 Little Fleur...|[[[$9.34, 09/27/2...|
|34c1e662-d85a-469...|[[[61992 Bartillo...|[[[$1.83, 04/29/2...|
+--------------------+--------------------+--------------------+
only showing top 5 rows

root
 |-- event_id: string (nullable = true)
 |-- reserved: struct (nullable = true)
 |    |-- confirmed: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- address: string (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- slots: long (nullable = true)
 |    |-- waitlist: array (nullable = true)
 |    | 

In [0]:

def getFlattenedConfirmedReservations(confirmedDf):
  df = confirmedDf.withColumn("confirmed_entry_map",
      create_map(col("confirmed_entries.city"),struct(col("confirmed_entries.address"), col("confirmed_entries.slots"))))\
      .withColumn("slots", col("confirmed_entries.slots"))\
      .drop(col("confirmed_entries"))

  df = df.select(col('event_id'), explode(col('confirmed_entry_map')), col('slots'))\
      .select(col("event_id"), col("key").alias("confirmed_city"), col("value").alias("details"), col('slots'))
      
  df = df.groupBy(col("event_id"), col("confirmed_city"))\
      .agg(collect_list("details").alias("details"),sum(col("slots")).alias("total_confirmed_slots"))

  print("confirmed reservation structure flattened .........................................")
  df.show(5)
  df.printSchema()
  return df

def getFlattenedWaitlistedReservations(waitlistDf):
  df = waitlistDf.select(col('event_id'), col("waitlisted_entries.*"))
  df = df.groupBy(col("event_id"), col("city").alias("waitlist_city"))\
      .agg(sum(col("slots")).alias("total_waitlist_slots"))

  print("waitlist reservation structure flattened .........................................")
  df.show(5)
  df.printSchema()
  return df
  

def getFlattenedReservedDf(reservedDf):
  df = reservedDf.select(col('event_id'), col("reserved.*"))

  confirmedDf = df.withColumn("confirmed_entries", explode(col("confirmed")))\
  .drop(col('confirmed')).drop(col('waitlist'))
  confirmedDf = getFlattenedConfirmedReservations(confirmedDf)

  waitlistDf = df.withColumn("waitlisted_entries", explode(col("waitlist"))).\
      drop(col('confirmed')).drop(col('waitlist'))
  waitlistDf = getFlattenedWaitlistedReservations(waitlistDf)

  df = waitlistDf.join(confirmedDf,
      (confirmedDf["event_id"] == waitlistDf["event_id"]) &
        (waitlistDf["waitlist_city"] == confirmedDf["confirmed_city"]), "outer")\
      .select(confirmedDf["event_id"], col('confirmed_city'), col('details'),
        col('total_confirmed_slots'), col('waitlist_city'), col('total_waitlist_slots'))\
      .where(col("event_id").isNotNull())
     

  print("reserved structured flattened .........................................")
  df.show(5)
  df.printSchema()
  return df

In [0]:
reservedDf = df.select(col('event_id'), col('reserved'))
reservedDf = getFlattenedReservedDf(reservedDf)

confirmed reservation structure flattened .........................................
+--------------------+--------------+--------------------+---------------------+
|            event_id|confirmed_city|             details|total_confirmed_slots|
+--------------------+--------------+--------------------+---------------------+
|da347978-162e-4f5...|         Delhi|[[3485 Trailsway ...|                    3|
|002ce8ce-3609-490...|     Bengaluru|[[354 8th Crossin...|                    4|
|10487f86-8343-4d9...|     Bengaluru|[[96894 Cascade H...|                    6|
|56fee556-77da-4fb...|        Mumbai|[[50596 Springs R...|                    1|
|9dbd178d-c4f4-4ae...|       Chennai|[[20159 Pepper Wo...|                    5|
+--------------------+--------------+--------------------+---------------------+
only showing top 5 rows

root
 |-- event_id: string (nullable = true)
 |-- confirmed_city: string (nullable = false)
 |-- details: array (nullable = true)
 |    |-- element: struct (conta

In [0]:
from pyspark.sql.types import *
collectUdf = udf(lambda cols,values: dict(zip(cols,values)),MapType(StringType(),
                                                      ArrayType(StructType([StructField("cost",StringType(),True),
                                                                           StructField("date",StringType(),True)]))))


def getFlattenedScheduleDf(scheduleDf):

  cols = scheduleDf.select(col("schedule.*")).columns
  df = scheduleDf.select(col('event_id'), col("schedule.*"))
  df = df.withColumn("all", array(cols))

  #Pyspark does not allow to create an array column using lit(), hence following work-around
  tempList = []
  for cityColumn in cols:
    tempList.append(lit(cityColumn))

  #converting this to array as zip need to iterables
  df = df.withColumn("city_events_map", collectUdf(array(tempList), df["all"]))\
      .select(col('event_id'),col('city'),explode(col("city_events_map")).alias("city", "schedules"))\
      .withColumn("schedule", explode(col("schedules")))\
      .select(col('event_id'), col('city'), col("schedule.*"))

  print("schedule structure flattened .........................................")
  df.show(5)
  df.printSchema()
  return df

In [0]:
scheduleDf = df.select(col('event_id'), col('schedule'))
scheduleDf = getFlattenedScheduleDf(scheduleDf)

schedule structure flattened .........................................
+--------------------+-------+-----+----------+
|            event_id|   city| cost|      date|
+--------------------+-------+-----+----------+
|c8a478e8-cfdc-466...|  Delhi|$0.26|03/16/2020|
|c8a478e8-cfdc-466...|  Delhi|$6.68|01/24/2020|
|c8a478e8-cfdc-466...|  Delhi|$0.44|04/20/2020|
|c8a478e8-cfdc-466...|Kolkata|$4.56|12/29/2019|
|c8a478e8-cfdc-466...|Kolkata|$8.41|05/16/2020|
+--------------------+-------+-----+----------+
only showing top 5 rows

root
 |-- event_id: string (nullable = true)
 |-- city: string (nullable = false)
 |-- cost: string (nullable = true)
 |-- date: string (nullable = true)



In [0]:

'''
Saving the simpler files.

'''

scheduleDf.coalesce(1)\
    .write.format("csv")\
    .option("header", "true")\
    .mode("overwrite")\
    .save("output/schedule.csv")

#This method saves single JSON object in a row, without comma separation
reservedDf.coalesce(1)\
      .write.format("json")\
      .mode("overwrite")\
      .json("output/reserved.json")