<a href="https://colab.research.google.com/github/Flotauv/Projet_Git/blob/master/TP_streaming_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TP March 2024

In this section, we are going to process the streaming data about
restaurants. But before to run this script you should run the script
\`TP<sub>streamingsource</sub>.ipynb\` to start the streaming data.

In [1]:
#!pip install pyspark


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
root_path = "/content/drive/MyDrive/Colab Notebooks"


# Processing the streaming data

In [4]:
  from pyspark.sql import SparkSession
  from pyspark.sql.types import StructType

  # Initialize Spark Session
  spark = SparkSession \
  .builder \
  .appName("JsonStreamingSimulation") \
  .getOrCreate()
# To allow automatic schemaInference while reading
  spark.conf.set("spark.sql.streaming.schemaInference", True)

  # Define the schema (optional, for optimization)
  #schema = StructType([...])  # Define your schema according to the JSON structure

  #from pyspark.sql.types import StructType, StructField, StringType

  # Read streamed data
  json_df = spark \
  .readStream \
  .option("maxFilesPerTrigger", 1) \
  .json(root_path + '/json_files_streamed')


  # Processing the data into a table (e.g., displaying it on memory for demo)
  dataStreamWriter = (json_df
  .writeStream
  .queryName("streamed_table")
  .outputMode("append")
  .format("memory"))

  #query.awaitTermination()
  query = dataStreamWriter.start()



# Reviewing streaming process status

In [14]:
print(query.status)


{'message': 'Getting offsets from FileStreamSource[file:/content/drive/MyDrive/Colab Notebooks/json_files_streamed]', 'isDataAvailable': True, 'isTriggerActive': True}


# Printing the schema of the streamed json data

In [6]:
json_df.printSchema()


root
 |-- address: struct (nullable = true)
 |    |-- building: string (nullable = true)
 |    |-- coord: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- zipcode: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- grades: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: struct (nullable = true)
 |    |    |    |-- $date: long (nullable = true)
 |    |    |-- grade: string (nullable = true)
 |    |    |-- score: long (nullable = true)
 |-- name: string (nullable = true)
 |-- restaurant_id: string (nullable = true)



# Printing the schema of the SQL table where data is stored

In [7]:
describe=spark.sql("DESCRIBE EXTENDED streamed_table").show(20)

print(type(describe))

+-------------+--------------------+-------+
|     col_name|           data_type|comment|
+-------------+--------------------+-------+
|      address|struct<building:s...|   NULL|
|      borough|              string|   NULL|
|      cuisine|              string|   NULL|
|       grades|array<struct<date...|   NULL|
|         name|              string|   NULL|
|restaurant_id|              string|   NULL|
+-------------+--------------------+-------+

<class 'NoneType'>


To visualize the schema better, we convert the streamed table
\`streamed<sub>table</sub>\` into a streamed dataframe \`df\`

In [8]:
df = spark.read.table("streamed_table")
df.printSchema()


root
 |-- address: struct (nullable = true)
 |    |-- building: string (nullable = true)
 |    |-- coord: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- zipcode: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- grades: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: struct (nullable = true)
 |    |    |    |-- $date: long (nullable = true)
 |    |    |-- grade: string (nullable = true)
 |    |    |-- score: long (nullable = true)
 |-- name: string (nullable = true)
 |-- restaurant_id: string (nullable = true)



In [9]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


# Printing the data stored in the SQL table

In [15]:
# Now you can query the in-memory table
result_df = spark.sql("SELECT * FROM streamed_table LIMIT 10")
result_df.show(10)



+--------------------+-------------+-------+------+--------------------+-------------+
|             address|      borough|cuisine|grades|                name|restaurant_id|
+--------------------+-------------+-------+------+--------------------+-------------+
|{4028, {[-73.8299...|       Queens|  Other|    []|                    |     50018982|
|{13542, {[-73.830...|       Queens|  Other|    []|                    |     50018984|
|{971, {[-73.96461...|    Manhattan|  Other|    []|              Subway|     50018987|
|{325, {[-73.99514...|    Manhattan|  Other|    []|Fairfield Inn Sui...|     50018989|
|{399, {[-73.90643...|     Brooklyn|  Other|    []|                    |     50018993|
|{461, {[-74.13849...|Staten Island|  Other|    []|         Indian Oven|     50018994|
|{921, {[-73.96913...|     Brooklyn|  Other|    []|        Cold Press'D|     50018995|
|{190, {[-73.94547...|     Brooklyn|  Other|    []|    Northside Bakery|     50018952|
|{307, {[-74.14538...|Staten Island|  Other

# Tasks

## Task 1

Generate a dataframe called \`transformed<sub>df</sub>\` from the
streamed dataframe \`df\`. Use the grades to generate a table of 5
columns (restaurant<sub>id</sub>, name, date, grade, score). Review the
outputted table of the \`Exercise 7\`from the TD2. You can ask to
ChatGPT4 or Mixtral to generate at least three solutions. Explain the
solutions.

## Task 2

This a solution to the Task 1. Explain how it works.

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.functions import from_unixtime

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Assuming 'df' is the DataFrame where the JSON was initially written

# If the grades array is nested inside the address, you'd use a select statement like this:
transformed_df = df.select(
    col('restaurant_id'),
    col('name'),
    explode(col('grades')).alias('grades_flat')
).select(
    'restaurant_id',
    'name',
    col('grades_flat.date.$date').alias('date'),
    col('grades_flat.grade').alias('grade'),
    col('grades_flat.score').alias('score')
)

# To format the date properly as shown in the image, we would use the from_unixtime function:
from pyspark.sql.functions import from_unixtime

transformed_df = transformed_df.withColumn(
    'date', from_unixtime(col('date') / 1000, 'yyyy-MM-dd')
)

# Show the resulting DataFrame
#transformed_df.show(10)
transformed_df.sort(col("date").desc()).show(10)



+-------------+--------------------+----------+--------------+-----+
|restaurant_id|                name|      date|         grade|score|
+-------------+--------------------+----------+--------------+-----+
|     50018344|Las Qrquideas Res...|2015-01-20|Not Yet Graded|    2|
|     50017941|           Chez Alex|2015-01-20|Not Yet Graded|   30|
|     50018174|     Birdbath Spring|2015-01-20|Not Yet Graded|    9|
|     50018254|           M&J Pizza|2015-01-20|Not Yet Graded|   22|
|     50018661|      Angebienvendia|2015-01-20|Not Yet Graded|    2|
|     50018283|M & K Spanish Res...|2015-01-20|Not Yet Graded|    2|
|     50018173|            Cha Lait|2015-01-20|Not Yet Graded|    2|
|     50018587|          Elk Coffee|2015-01-20|Not Yet Graded|    2|
|     50018376| Ab Halal Restaurant|2015-01-20|Not Yet Graded|   58|
|     50018289|      Gui Lin Mi Fen|2015-01-20|Not Yet Graded|   12|
+-------------+--------------------+----------+--------------+-----+
only showing top 10 rows



In [22]:
transformed_df.createOrReplaceTempView("transformed_df_view")


## Task 3

The Michelin group wants to award the best restaurant with a Star. A
Michelin Star recognizes restaurants for exceptional cooking. It
considers ingredient quality, flavor harmony, technique mastery, chef's
personality in cuisine, and consistency across the menu and over time.
The manager of the group asks you to find the best(s) restaurant from
the streaming data. Is it possible? If you answer is yes, what would be
your data query strategy to fulfill the request?

-   Explain the steps of the strategy and write the corresponding query
    for each step. Use the sample operations from Exercise 8 in TD2.

The sample operations were:

-   Filtering rows where the grade is 'A'

In [24]:

df1 = transformed_df.filter(transformed_df['grade'] == 'A')
df1.show(5)


+-------------+--------------------+----------+-----+-----+
|restaurant_id|                name|      date|grade|score|
+-------------+--------------------+----------+-----+-----+
|     50018581|            Pad Thai|2015-01-08|    A|    5|
|     50018324|   Shelley'S Kitchen|2015-01-16|    A|   13|
|     50018608|Trini Delite Roti...|2015-01-15|    A|   12|
|     50018555|              Carvel|2014-12-31|    A|   11|
|     50018266|       Sun'S Kitchen|2014-12-17|    A|    3|
+-------------+--------------------+----------+-----+-----+
only showing top 5 rows



In [25]:
df_1_grade_A=spark.sql("SELECT * FROM transformed_df_view WHERE grade='A' ")

df_1_grade_A.show(5)

+-------------+--------------------+----------+-----+-----+
|restaurant_id|                name|      date|grade|score|
+-------------+--------------------+----------+-----+-----+
|     50018581|            Pad Thai|2015-01-08|    A|    5|
|     50018324|   Shelley'S Kitchen|2015-01-16|    A|   13|
|     50018608|Trini Delite Roti...|2015-01-15|    A|   12|
|     50018555|              Carvel|2014-12-31|    A|   11|
|     50018266|       Sun'S Kitchen|2014-12-17|    A|    3|
+-------------+--------------------+----------+-----+-----+
only showing top 5 rows



-   Where clause to select records after a specific date:

In [19]:
 from pyspark.sql.functions import to_date
df1 = transformed_df.where(to_date(transformed_df['date'], 'yyyy-MM-dd') > '2014-01-01')
df1.show(5)



+-------------+--------------------+----------+--------------+-----+
|restaurant_id|                name|      date|         grade|score|
+-------------+--------------------+----------+--------------+-----+
|     50018754|El Coral Deli Res...|2015-01-20|Not Yet Graded|    3|
|     50018661|      Angebienvendia|2015-01-20|Not Yet Graded|    2|
|     50018581|            Pad Thai|2015-01-08|             A|    5|
|     50018468|      Savour Sichuan|2015-01-20|Not Yet Graded|   56|
|     50018480|          ''W'' Cafe|2015-01-20|Not Yet Graded|   43|
+-------------+--------------------+----------+--------------+-----+
only showing top 5 rows



In [23]:
df1_sql=spark.sql("SELECT * FROM transformed_df_view WHERE  date >'2014-01-01'")

df1_sql.show(5)

+-------------+--------------------+----------+--------------+-----+
|restaurant_id|                name|      date|         grade|score|
+-------------+--------------------+----------+--------------+-----+
|     50018754|El Coral Deli Res...|2015-01-20|Not Yet Graded|    3|
|     50018661|      Angebienvendia|2015-01-20|Not Yet Graded|    2|
|     50018581|            Pad Thai|2015-01-08|             A|    5|
|     50018468|      Savour Sichuan|2015-01-20|Not Yet Graded|   56|
|     50018480|          ''W'' Cafe|2015-01-20|Not Yet Graded|   43|
+-------------+--------------------+----------+--------------+-----+
only showing top 5 rows



-   Aggregating to find the average score for each restaurant:

In [26]:
 from pyspark.sql import functions as F
 df1 = transformed_df.groupBy('restaurant_id').agg(F.avg('score').alias('avg_score'))
df1.show(5)



+-------------+---------+
|restaurant_id|avg_score|
+-------------+---------+
|     50018404|     23.0|
|     50018376|     58.0|
|     50018318|     20.0|
|     50018344|      2.0|
|     50018355|      0.0|
+-------------+---------+
only showing top 5 rows



In [27]:
df1_sql_avg=spark.sql("SELECT restaurant_id, MEAN(score) as avg_score FROM transformed_df_view GROUP BY restaurant_id")

df1_sql_avg.show(5)

+-------------+---------+
|restaurant_id|avg_score|
+-------------+---------+
|     50018404|     23.0|
|     50018376|     58.0|
|     50018318|     20.0|
|     50018344|      2.0|
|     50018355|      0.0|
+-------------+---------+
only showing top 5 rows



-   Using regexp<sub>replace</sub> to clean up restaurant names: The
    F.regexp<sub>replace</sub> function replaces every \[ or \] found in
    the name column with an empty string '', effectively removing these
    characters from the string.

In [None]:
df1 = grades_table_df.withColumn('name_clean', F.regexp_replace('name', r'(\[|\])', ''))
df1.show(5)



-   Applying conditional logic with when to create a new
    'score<sub>category</sub>' column

In [None]:
 df1 = grades_table_df.withColumn('score_category', F.when(grades_table_df['score'] < 10, 'Low')
                                               .when(grades_table_df['score'] < 13, 'Medium')
                                               .otherwise('High'))
df1.show(5)

