# Pyspark Streaming Food data 

In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Streaming Process Files") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .master("local[*]") \
    .getOrCreate()

spark

24/01/13 16:38:23 WARN Utils: Your hostname, Ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/01/13 16:38:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/13 16:38:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/13 16:38:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df_pyspark=spark.read.csv('final_train.csv', header=True, inferSchema=True)
df_pyspark.show()

                                                                                

+-------+----+---------+---------+-----------+-----------+-------+-------+---------+-------+--------------+----------+---------------------+-----------------+----------+
|     id|week|center_id|city_code|region_code|center_type|op_area|meal_id| category|cuisine|checkout_price|base_price|emailer_for_promotion|homepage_featured|num_orders|
+-------+----+---------+---------+-----------+-----------+-------+-------+---------+-------+--------------+----------+---------------------+-----------------+----------+
|1379560|   1|       55|      647|         56|     TYPE_C|    2.0|   1885|Beverages|   Thai|        136.83|    152.29|                    0|                0|       177|
|1018704|   2|       55|      647|         56|     TYPE_C|    2.0|   1885|Beverages|   Thai|        135.83|    152.29|                    0|                0|       323|
|1196273|   3|       55|      647|         56|     TYPE_C|    2.0|   1885|Beverages|   Thai|        132.92|    133.92|                    0|          

In [3]:
df_pyspark.describe().show()

24/01/13 16:39:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 5:>                                                          (0 + 1) / 1]

+-------+------------------+-----------------+-----------------+-----------------+------------------+-----------+-----------------+------------------+---------+-----------+------------------+------------------+---------------------+-------------------+-----------------+
|summary|                id|             week|        center_id|        city_code|       region_code|center_type|          op_area|           meal_id| category|    cuisine|    checkout_price|        base_price|emailer_for_promotion|  homepage_featured|       num_orders|
+-------+------------------+-----------------+-----------------+-----------------+------------------+-----------+-----------------+------------------+---------+-----------+------------------+------------------+---------------------+-------------------+-----------------+
|  count|            456548|           456548|           456548|           456548|            456548|     456548|           456548|            456548|   456548|     456548|            456

                                                                                

- We don't need last two columns 'isFraud' and 'isFlaggedFraud' as for streaming systems

- Step column maps a unit of time in the real world. In this case let us assume that the value of '1' in step represents 1 hour of time.
- So we can assume for this example that we have another job that runs every hour and gets all the transactions in that time frame.

In [5]:
from pyspark.sql.functions import sum, col, desc
df_pyspark.groupBy('week').count().sort(desc('count')).show()



+----+-----+
|week|count|
+----+-----+
| 122| 3359|
| 105| 3348|
| 106| 3347|
| 140| 3332|
| 123| 3331|
| 134| 3330|
| 133| 3324|
| 113| 3312|
| 100| 3309|
| 143| 3305|
|  94| 3303|
| 144| 3302|
| 114| 3300|
| 109| 3299|
| 121| 3298|
| 131| 3293|
| 110| 3293|
| 129| 3291|
| 117| 3290|
|  91| 3289|
+----+-----+
only showing top 20 rows



                                                                                

- We can clearly see the number of transactions(count) for each step
- We can now save the output by filtering on each step and saving it as a seperate file

In [6]:
weeks = df_pyspark.select('week').distinct().collect()
print(weeks)
for week in weeks:
    df_upt = df_pyspark.where(f'week = {week[0]}')
    # By addig coalesce(1) we save the dataframe to one file
    df_upt.coalesce(1).write.mode('append').option('header', 'true').csv('data/food_data_upt')


                                                                                

[Row(week=31), Row(week=85), Row(week=137), Row(week=65), Row(week=53), Row(week=133), Row(week=78), Row(week=108), Row(week=34), Row(week=101), Row(week=115), Row(week=126), Row(week=81), Row(week=28), Row(week=76), Row(week=26), Row(week=27), Row(week=44), Row(week=103), Row(week=12), Row(week=91), Row(week=22), Row(week=128), Row(week=122), Row(week=93), Row(week=111), Row(week=47), Row(week=140), Row(week=132), Row(week=1), Row(week=52), Row(week=13), Row(week=6), Row(week=16), Row(week=86), Row(week=3), Row(week=142), Row(week=20), Row(week=40), Row(week=139), Row(week=94), Row(week=57), Row(week=54), Row(week=120), Row(week=96), Row(week=48), Row(week=5), Row(week=19), Row(week=92), Row(week=64), Row(week=117), Row(week=41), Row(week=15), Row(week=43), Row(week=112), Row(week=37), Row(week=61), Row(week=127), Row(week=88), Row(week=107), Row(week=9), Row(week=17), Row(week=72), Row(week=35), Row(week=114), Row(week=4), Row(week=55), Row(week=59), Row(week=8), Row(week=100), Row(w

                                                                                

In [8]:
!cd data/food_data_upt/ && ls

part-00000-03b67e06-404f-41c2-9aa3-f694aa319792-c000.csv
part-00000-07032cf7-1e73-4288-b965-e3bc399866fb-c000.csv
part-00000-084ca283-4bec-402e-8df9-1742389ebb8f-c000.csv
part-00000-08ecd9c0-989f-4815-956d-c6e4bc357aa0-c000.csv
part-00000-092553ac-a24c-47dc-b7f9-359c87ffec1b-c000.csv
part-00000-0a482730-8851-4491-b9bc-0b4489a7097c-c000.csv
part-00000-0b470524-6494-4afe-8a24-3dd7b2a27aef-c000.csv
part-00000-0fd1c8b4-781e-4192-a130-28bd2cbc173e-c000.csv
part-00000-12b64b7f-7e6d-4cd5-a9cd-96dd0d2bbbd8-c000.csv
part-00000-18484a15-965b-4127-9409-8ee46a7b6ab9-c000.csv
part-00000-1a1996cd-ca93-46eb-9295-27354e0129ad-c000.csv
part-00000-1b3833c7-c251-4a8b-a506-a026e98dacd2-c000.csv
part-00000-1b94c717-9bdd-4cf9-a3c7-e61cdb13521d-c000.csv
part-00000-1e078d57-3c2c-4b8a-bdc0-6a897360af64-c000.csv
part-00000-1f9bcffc-ff03-4936-81b7-1205086adffe-c000.csv
part-00000-274a5446-8f78-4825-a87d-8d0696e6f266-c000.csv
part-00000-28c0ff22-37a3-4358-8662-f16d18aa6505-c000.csv
part-00000-2a38427e-96bf-43eb-a

In [14]:
df_partition= spark.read.csv(
    "data/food_data_upt/part-00000-092553ac-a24c-47dc-b7f9-359c87ffec1b-c000.csv",
    header=True, 
    inferSchema=True,

)
df_partition.show()

                                                                                

+-------+----+---------+---------+-----------+-----------+-------+-------+---------+-------+--------------+----------+---------------------+-----------------+----------+
|     id|week|center_id|city_code|region_code|center_type|op_area|meal_id| category|cuisine|checkout_price|base_price|emailer_for_promotion|homepage_featured|num_orders|
+-------+----+---------+---------+-----------+-----------+-------+-------+---------+-------+--------------+----------+---------------------+-----------------+----------+
|1211414|  33|       55|      647|         56|     TYPE_C|    2.0|   1885|Beverages|   Thai|        123.25|    122.25|                    0|                0|       215|
|1212312|  33|       24|      614|         85|     TYPE_B|    3.6|   1885|Beverages|   Thai|         146.5|     146.5|                    0|                0|      1242|
|1117798|  33|       11|      679|         56|     TYPE_A|    3.7|   1885|Beverages|   Thai|        121.28|    120.28|                    0|          

In [15]:
df_partition.groupBy('week').count().show()

[Stage 162:>                                                        (0 + 1) / 1]

+----+-----+
|week|count|
+----+-----+
|  33| 3045|
+----+-----+



                                                                                

In [16]:
# Schema
dataSchema = df_partition.schema
dataSchema

StructType([StructField('id', IntegerType(), True), StructField('week', IntegerType(), True), StructField('center_id', IntegerType(), True), StructField('city_code', IntegerType(), True), StructField('region_code', IntegerType(), True), StructField('center_type', StringType(), True), StructField('op_area', DoubleType(), True), StructField('meal_id', IntegerType(), True), StructField('category', StringType(), True), StructField('cuisine', StringType(), True), StructField('checkout_price', DoubleType(), True), StructField('base_price', DoubleType(), True), StructField('emailer_for_promotion', IntegerType(), True), StructField('homepage_featured', IntegerType(), True), StructField('num_orders', IntegerType(), True)])

In [17]:
stream = (
        spark.readStream.schema(dataSchema) \
        .option('maxFilesPerTrigger',1) \
        .csv('data/food_data_upt/')
)