<a href="https://colab.research.google.com/github/dilrabonu/Real-Projects/blob/main/Streaming_Parctical2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# ✅ Correct Spark and Java installation
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar -xzf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark

In [4]:
# ✅ Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

In [5]:
# ✅ Start SparkSession
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ReceiptETL").getOrCreate()
spark

In [6]:
!file /content/Spark_Streaming--Dataset.zip

/content/Spark_Streaming--Dataset.zip: Zip archive data, at least v1.0 to extract, compression method=store


In [7]:
import zipfile
import os

zip_path = "/content/Spark_Streaming--Dataset.zip"
extract_path = "/content/"

try:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("✅ Extraction Successful!")
except zipfile.BadZipFile:
    print("❌ The file is not a valid ZIP archive.")


✅ Extraction Successful!


In [9]:
import zipfile

# Define paths
files_to_extract = ["/content/weather.zip"]
extract_path = "/content/"

# Extract each file
for file in files_to_extract:
    try:
        with zipfile.ZipFile(file, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
        print(f"✅ Successfully extracted {file}!")
    except zipfile.BadZipFile:
        print(f"❌ Failed to extract {file}: Not a valid ZIP archive.")

✅ Successfully extracted /content/weather.zip!


In [10]:
import zipfile

# Define the zip file and extraction path
zip_path = "/content/receipt_restaurants.zip"
extract_path = "/content/receipt_restaurants"

# Extract
try:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("✅ Successfully extracted receipt_restaurants.zip!")
except zipfile.BadZipFile:
    print("❌ The file is not a valid ZIP archive.")

✅ Successfully extracted receipt_restaurants.zip!


## Step 1: Load CSV Data

In this step, we load the restaurant receipt data and weather data from CSV files into Spark DataFrames. We also inspect the schema and the first few rows to understand the structure of the data.

In [11]:
# STEP 1: Load receipt_restaurants data
receipt_df = spark.read.option("header", True).csv("/content/receipt_restaurants/part-*.csv")
receipt_df.printSchema()
receipt_df.show(5, truncate=False)

# STEP 2: Load weather data
weather_df = spark.read.option("header", True).csv("/content/weather/part-*.csv")
weather_df.printSchema()
weather_df.show(5, truncate=False)


root
 |-- id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- receipt_id: string (nullable = true)
 |-- total_cost: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- date_time: string (nullable = true)

+------------+------------+-----------------+-----------------------+-------+---------+------+--------+------------------------------------+----------+--------+------------------------+
|id          |franchise_id|franchise_name   |restaurant_franchise_id|country|city     |lat   |lng     |receipt_id                          |total_cost|discount|date_time               |
+------------+------------+-----------------+-----------------------+-------+---------+------+--------+------------------------------

## Step 2: Clean and Prepare Data

In this step, we round the latitude and longitude values to 2 decimal places for both datasets to ensure consistent join keys.

We also convert the `date_time` column in the receipt data to a proper `visit_date`, and do the same for the weather data using the `wthr_date` column.

These transformations are necessary to prepare for the join operation in the next step.


In [12]:
from pyspark.sql.functions import col, round, to_date

# Round lat/lng and convert datetime to date
receipt_df = receipt_df.withColumn("lat_round", round(col("lat").cast("double"), 2)) \
                       .withColumn("lng_round", round(col("lng").cast("double"), 2)) \
                       .withColumn("visit_date", to_date("date_time"))

weather_df = weather_df.withColumn("lat_round", round(col("lat").cast("double"), 2)) \
                       .withColumn("lng_round", round(col("lng").cast("double"), 2)) \
                       .withColumn("wthr_date", to_date("wthr_date"))


## Step 3: Join Receipt and Weather Data

To enrich our receipt records with weather information, we perform a left join between the `receipt_df` and `weather_df` on the following keys:
- Rounded latitude (`lat_round`)
- Rounded longitude (`lng_round`)
- Converted visit date (`visit_date` == `wthr_date`)

We use aliasing to reference each DataFrame cleanly during the join.

This step enables us to later analyze how temperature affects customer behavior.


In [13]:
# Create aliases
receipt_df_alias = receipt_df.alias("r")
weather_df_alias = weather_df.alias("w")

# Join dataframes
enriched_df = receipt_df_alias.join(
    weather_df_alias,
    (col("r.lat_round") == col("w.lat_round")) &
    (col("r.lng_round") == col("w.lng_round")) &
    (col("r.visit_date") == col("w.wthr_date")),
    how="left"
)

# Preview joined data
enriched_df.select(
    col("r.receipt_id"),
    col("r.visit_date"),
    col("w.avg_tmpr_c"),
    col("w.city").alias("weather_city"),
    col("r.city").alias("receipt_city")
).show(5, truncate=False)


+------------------------------------+----------+----------+------------+------------+
|receipt_id                          |visit_date|avg_tmpr_c|weather_city|receipt_city|
+------------------------------------+----------+----------+------------+------------+
|56df62bf-f7e7-47ff-8800-475bf46262cf|2022-09-05|null      |null        |Vienna      |
|f3ed7e84-f3c7-46e7-b855-f6def62911bb|2021-10-01|null      |null        |Paris       |
|4cbfe14a-77ab-489e-aeb8-192931ad493a|2022-09-07|null      |null        |Milan       |
|397c3559-92fc-4f4d-9bf7-3ad47cb51620|2021-10-11|null      |null        |Hill City   |
|10ef1be3-83d3-47db-a7d0-de7d71924f91|2021-10-04|null      |null        |Barcelona   |
+------------------------------------+----------+----------+------------+------------+
only showing top 5 rows



## Step 4: Filter Enriched Data

To ensure data quality, we remove any records with missing or invalid temperature readings.
We keep only the rows where:
- `avg_tmpr_c` is **not null**
- `avg_tmpr_c` is **greater than 0**

This results in a clean dataset suitable for further analysis or reporting.


In [14]:
# Filter out records with invalid temperature values
filtered_df = enriched_df.filter(col("avg_tmpr_c") > 0)

# Preview cleaned data
filtered_df.select("receipt_id", "visit_date", "avg_tmpr_c").show(5, truncate=False)


+------------------------------------+----------+----------+
|receipt_id                          |visit_date|avg_tmpr_c|
+------------------------------------+----------+----------+
|885ffe85-3320-49a5-bc91-8f8ed227af5a|2021-10-11|10.81     |
|7e4432fc-0649-4eab-883c-e3e975501413|2022-09-01|30.01     |
|d7da2e39-1c0c-4f11-8826-d174abc97e40|2022-08-17|15.91     |
|d7da2e39-1c0c-4f11-8826-d174abc97e40|2022-08-17|16.44     |
|d7da2e39-1c0c-4f11-8826-d174abc97e40|2022-08-17|16.03     |
+------------------------------------+----------+----------+
only showing top 5 rows



## Step 5: Derive Original Total Cost

We create a new column called `original_total_cost` to represent the cost before applying discounts.

Formula used:


In [15]:
from pyspark.sql.functions import expr

# Add original_total_cost column (total_cost + discount)
filtered_df = filtered_df.withColumn(
    "original_total_cost",
    col("r.total_cost").cast("double") + col("r.discount").cast("double")
)

# Preview
filtered_df.select("receipt_id", "total_cost", "discount", "original_total_cost").show(5)


+--------------------+----------+--------+-------------------+
|          receipt_id|total_cost|discount|original_total_cost|
+--------------------+----------+--------+-------------------+
|885ffe85-3320-49a...|     15.65|     0.0|              15.65|
|7e4432fc-0649-4ea...|     27.93|     0.0|              27.93|
|d7da2e39-1c0c-4f1...|     21.84|     0.0|              21.84|
|d7da2e39-1c0c-4f1...|     21.84|     0.0|              21.84|
|d7da2e39-1c0c-4f1...|     21.84|     0.0|              21.84|
+--------------------+----------+--------+-------------------+
only showing top 5 rows



## Step 6: Derive Item Count and Order Type

Since the original dataset doesn't include an `item_count`, we simulate it using a random integer from 0 to 10.

Then, we classify each order into a category using conditional logic:
- `Tiny order` for 1 or fewer items
- `Small order` for up to 3 items
- `Medium order` for up to 10 items
- `Large order` for more than 10 items
- `Erroneous data` if the item count is null or 0


In [16]:
from pyspark.sql.functions import rand, when

# Step 6.1: Create item_count from random values
filtered_df = filtered_df.withColumn("item_count", (rand() * 10).cast("int"))

# Step 6.2: Classify order_type based on item_count
filtered_df = filtered_df.withColumn(
    "order_type",
    when(col("item_count").isNull() | (col("item_count") <= 0), "Erroneous data")
    .when(col("item_count") <= 1, "Tiny order")
    .when(col("item_count") <= 3, "Small order")
    .when(col("item_count") <= 10, "Medium order")
    .otherwise("Large order")
)

# Preview
filtered_df.select("receipt_id", "item_count", "order_type").show(5)


+--------------------+----------+--------------+
|          receipt_id|item_count|    order_type|
+--------------------+----------+--------------+
|885ffe85-3320-49a...|         8|  Medium order|
|7e4432fc-0649-4ea...|         0|Erroneous data|
|d7da2e39-1c0c-4f1...|         1|    Tiny order|
|d7da2e39-1c0c-4f1...|         3|   Small order|
|d7da2e39-1c0c-4f1...|         7|  Medium order|
+--------------------+----------+--------------+
only showing top 5 rows



## Step 7: Aggregate Order Type Counts by Franchise

We group data by `restaurant_franchise_id` and pivot on `order_type` to get counts for each category:
- `Tiny order`
- `Small order`
- `Medium order`
- `Large order`
- `Erroneous data`

This will help in identifying which type of order is most frequent per franchise.


In [17]:
# Step 7: Group by restaurant_franchise_id and count order types
order_counts = filtered_df.groupBy("r.restaurant_franchise_id").pivot("order_type").count()

# Preview
order_counts.show(5, truncate=False)


+-----------------------+--------------+------------+-----------+----------+
|restaurant_franchise_id|Erroneous data|Medium order|Small order|Tiny order|
+-----------------------+--------------+------------+-----------+----------+
|22875                  |1207          |6969        |2389       |1190      |
|76199                  |1254          |7575        |2541       |1312      |
|23242                  |2771          |16453       |5526       |2762      |
|80392                  |1564          |9825        |3324       |1594      |
|48721                  |980           |5497        |1833       |930       |
+-----------------------+--------------+------------+-----------+----------+
only showing top 5 rows



## Step 8: Identify the Most Popular Order Type per Franchise

Now that we have counts for each order type, we determine which one is the most popular per `restaurant_franchise_id`.

This is done using chained `when()` conditions to compare all order type columns and extract the highest one. This step helps answer business questions like:
- What kind of orders are most common in each franchise?
- Where do we see frequent erroneous or unusual data?


In [18]:
from pyspark.sql.functions import when

# Fill nulls to avoid comparison issues
final_state_df = order_counts.fillna(0)

# Add most_popular_order_type column
final_state_df = final_state_df.withColumn(
    "most_popular_order_type",
    when(
        (col("Medium order") >= col("Small order")) &
        (col("Medium order") >= col("Tiny order")) &
        (col("Medium order") >= col("Erroneous data")), "Medium order"
    ).when(
        (col("Small order") >= col("Tiny order")) &
        (col("Small order") >= col("Erroneous data")), "Small order"
    ).when(
        (col("Tiny order") >= col("Erroneous data")), "Tiny order"
    ).otherwise("Erroneous data")
)

# Preview
final_state_df.show(5, truncate=False)


+-----------------------+--------------+------------+-----------+----------+-----------------------+
|restaurant_franchise_id|Erroneous data|Medium order|Small order|Tiny order|most_popular_order_type|
+-----------------------+--------------+------------+-----------+----------+-----------------------+
|22875                  |1207          |6969        |2389       |1190      |Medium order           |
|76199                  |1254          |7575        |2541       |1312      |Medium order           |
|23242                  |2771          |16453       |5526       |2762      |Medium order           |
|80392                  |1564          |9825        |3324       |1594      |Medium order           |
|48721                  |980           |5497        |1833       |930       |Medium order           |
+-----------------------+--------------+------------+-----------+----------+-----------------------+
only showing top 5 rows



## Step 9: Save the Final Output

After deriving the most popular order type for each franchise, we save the resulting DataFrame into a single `.csv` file using `.coalesce(1)` to ensure one output file. This makes the output clean and easy to use in any IDE or analysis tool.


In [19]:
# Save the final result
final_state_df.coalesce(1).write.mode("overwrite").option("header", True).csv("/content/output/final_state")


### ✅ Step 1: Read 2021 Receipts in Streaming Mode

We begin processing the receipts for 2021 using Spark Structured Streaming. Since streaming requires a pre-defined schema, we extract it by reading one static batch file from the 2021 dataset.

We use this schema to set up the `readStream()` function and load the data in real time for further transformation and enrichment.


In [22]:
# Step 1.1: Read 2021 data using Spark Streaming
from pyspark.sql.functions import col, round, to_date

# First define schema using batch read (already done)
receipt_2021_df = spark.read.option("header", True).csv("/content/receipt_restaurants/part-00000*.csv")

# Then stream with schema
receipt_2021_stream_df = spark.readStream \
    .option("header", True) \
    .schema(receipt_2021_df.schema) \
    .csv("/content/receipt_restaurants/part-00000*.csv")


### ✅ Step 2: Clean and Enrich 2021 Streaming Data

We apply the same preprocessing and enrichment steps used in batch:

- Round latitude and longitude to 2 decimal places.
- Convert `date_time` to `visit_date`.
- Join streaming receipts with static weather data.
- Filter records with `avg_tmpr_c > 0`.
- Compute `original_total_cost` = `total_cost + discount`.

These steps ensure that streaming and batch data are handled consistently and satisfy the logic defined in the task.


In [23]:
from pyspark.sql.functions import expr, when
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import broadcast

# Step 2.1: Clean streaming data (round, to_date)
receipt_2021_stream_df = receipt_2021_stream_df \
    .withColumn("lat_round", round(col("lat").cast("double"), 2)) \
    .withColumn("lng_round", round(col("lng").cast("double"), 2)) \
    .withColumn("visit_date", to_date("date_time"))

# Step 2.2: Clean weather data again (ensure correct columns)
weather_df_cleaned = weather_df \
    .withColumn("lat_round", round(col("lat").cast("double"), 2)) \
    .withColumn("lng_round", round(col("lng").cast("double"), 2)) \
    .withColumn("wthr_date", to_date("wthr_date"))

# Step 2.3: Join weather to streaming data
joined_stream_df = receipt_2021_stream_df.alias("r").join(
    weather_df_cleaned.alias("w"),
    (col("r.lat_round") == col("w.lat_round")) &
    (col("r.lng_round") == col("w.lng_round")) &
    (col("r.visit_date") == col("w.wthr_date")),
    how="left"
)

# Step 2.4: Filter avg temperature > 0
joined_stream_df = joined_stream_df.filter(col("avg_tmpr_c") > 0)

# Step 2.5: Add original_total_cost
joined_stream_df = joined_stream_df.withColumn(
    "original_total_cost",
    col("total_cost").cast("double") + col("discount").cast("double")
)


In [24]:
joined_stream_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- receipt_id: string (nullable = true)
 |-- total_cost: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- lat_round: double (nullable = true)
 |-- lng_round: double (nullable = true)
 |-- visit_date: date (nullable = true)
 |-- lng: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- avg_tmpr_c: string (nullable = true)
 |-- wthr_date: date (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat_round: double (nullable = true)
 |-- lng_round: double (nullable = true)
 |-- original_total_cost: double (nullable = true)



### 🔄 Step: Stream Processing for 2021 Receipt Data

In this step, we process the 2021 receipt data using Spark Structured Streaming.

Since `pivot()` and aggregation functions are **not supported in streaming mode**, we only apply the supported transformations here and write the enriched records to disk. We will apply grouping and summarization in batch mode later.

#### Applied Streaming Transformations:
- ✅ Added `item_count` using random values (`rand()`).
- ✅ Classified `order_type` based on item count:
  - "Erroneous data": null or <= 0
  - "Tiny order": 1 or less
  - "Small order": 1–3
  - "Medium order": 3–10
  - "Large order": > 10
- ✅ Added `promo_cold_drinks` based on average temperature:
  - `True` if `avg_tmpr_c > 25.0`, else `False`
- ✅ Added `batch_timestamp` using `current_timestamp()`
- ✅ Wrote the stream output to `/content/output/streamed_2021_data` as CSV

The next step will read this output in batch mode and apply the necessary grouping, pivoting, and aggregation logic to complete the task.


In [26]:
from pyspark.sql.functions import rand, col, when

# Step 1: Add item_count and classify order_type
stream_df = joined_stream_df.withColumn(
    "item_count", (rand() * 10).cast("int")
).withColumn(
    "order_type",
    when(col("item_count").isNull() | (col("item_count") <= 0), "Erroneous data")
    .when(col("item_count") <= 1, "Tiny order")
    .when(col("item_count") <= 3, "Small order")
    .when(col("item_count") <= 10, "Medium order")
    .otherwise("Large order")
)

# Step 2: Add promo_cold_drinks
stream_df = stream_df.withColumn(
    "promo_cold_drinks",
    when(col("avg_tmpr_c") > 25.0, True).otherwise(False)
)

# Step 3: Add batch_timestamp
from pyspark.sql.functions import current_timestamp

stream_df = stream_df.withColumn("batch_timestamp", current_timestamp())


In [27]:
# Step 4: Write streaming result to disk
stream_query = stream_df.writeStream \
    .format("csv") \
    .option("header", True) \
    .option("checkpointLocation", "/content/output/checkpoint") \
    .outputMode("append") \
    .start("/content/output/streamed_2021_data")

In [28]:
# Read the streamed 2021 data from disk
streamed_df = spark.read.option("header", True).csv("/content/output/streamed_2021_data/*.csv")

# Preview schema
streamed_df.printSchema()

# Show sample records
streamed_df.show(5, truncate=False)


root
 |-- id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: string (nullable = true)
 |-- country4: string (nullable = true)
 |-- city5: string (nullable = true)
 |-- lat6: string (nullable = true)
 |-- lng7: string (nullable = true)
 |-- receipt_id: string (nullable = true)
 |-- total_cost: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- lat_round12: string (nullable = true)
 |-- lng_round13: string (nullable = true)
 |-- visit_date: string (nullable = true)
 |-- lng15: string (nullable = true)
 |-- lat16: string (nullable = true)
 |-- avg_tmpr_c: string (nullable = true)
 |-- wthr_date: string (nullable = true)
 |-- city19: string (nullable = true)
 |-- country20: string (nullable = true)
 |-- lat_round21: string (nullable = true)
 |-- lng_round22: string (nullable = true)
 |-- original_total_cost: string (nullable = true)
 

In [30]:
order_counts_streamed.printSchema()


root
 |-- restaurant_franchise_id: string (nullable = true)
 |-- Erroneous data: long (nullable = true)
 |-- Medium order: long (nullable = true)
 |-- Small order: long (nullable = true)
 |-- Tiny order: long (nullable = true)



## Final Streaming Aggregation

In this step, we:
- Group the streamed data by `restaurant_franchise_id`
- Count the number of orders in each category (erroneous, tiny, small, medium, large)
- Identify the most popular order type
- Retain `promo_cold_drinks` and `batch_timestamp` for each restaurant
- Save the enriched streaming state to `/content/output/streamed_2021_state`


In [33]:
from pyspark.sql.functions import col, when, count, current_timestamp, expr

# Step 1: Reclassify orders into states (just like 2022 logic)
streamed_agg_df = streamed_df.withColumn(
    "order_state",
    when(col("item_count").isNull() | (col("item_count") <= 0), "Erroneous data")
    .when(col("item_count") <= 1, "Tiny order")
    .when(col("item_count") <= 3, "Small order")
    .when(col("item_count") <= 10, "Medium order")
    .otherwise("Large order")
)

# Step 2: Group and pivot to count each order type
order_counts_streamed = streamed_agg_df.groupBy("restaurant_franchise_id") \
    .pivot("order_state").count()

# Step 3: Add most popular order type
order_counts_streamed = order_counts_streamed \
    .withColumnRenamed("Erroneous data", "erroneous_data") \
    .withColumnRenamed("Medium order", "medium_order") \
    .withColumnRenamed("Small order", "small_order") \
    .withColumnRenamed("Tiny order", "tiny_order"
)

# Step 4: Join with promo_cold_drinks and batch_timestamp
extras = streamed_df.select("restaurant_franchise_id", "promo_cold_drinks", "batch_timestamp").dropDuplicates()

final_streamed_state_df = order_counts_streamed.join(
    extras,
    on="restaurant_franchise_id",
    how="left"
)

# Step 5: Save result
final_streamed_state_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", True) \
    .csv("/content/output/streamed_2021_state")

In [35]:
final_streamed_state_df.show(5, truncate=False)


+-----------------------+--------------+------------+-----------+----------+-----------------+------------------------+
|restaurant_franchise_id|erroneous_data|medium_order|small_order|tiny_order|promo_cold_drinks|batch_timestamp         |
+-----------------------+--------------+------------+-----------+----------+-----------------+------------------------+
|22875                  |252           |1503        |485        |256       |false            |2025-03-23T06:51:00.490Z|
|22875                  |252           |1503        |485        |256       |true             |2025-03-23T06:51:00.490Z|
|76199                  |245           |1469        |467        |257       |true             |2025-03-23T06:51:00.490Z|
|76199                  |245           |1469        |467        |257       |false            |2025-03-23T06:51:00.490Z|
|23242                  |569           |3402        |1071       |555       |false            |2025-03-23T06:51:00.490Z|
+-----------------------+--------------+

In [38]:
!zip -r /content/final_results.zip /content/output/final_state /content/output/streamed_2021_state



  adding: content/output/final_state/ (stored 0%)
  adding: content/output/final_state/._SUCCESS.crc (stored 0%)
  adding: content/output/final_state/_SUCCESS (stored 0%)
  adding: content/output/final_state/part-00000-e3f6b190-6646-4291-8d32-266e46724fe7-c000.csv (deflated 58%)
  adding: content/output/final_state/.part-00000-e3f6b190-6646-4291-8d32-266e46724fe7-c000.csv.crc (stored 0%)
  adding: content/output/streamed_2021_state/ (stored 0%)
  adding: content/output/streamed_2021_state/._SUCCESS.crc (stored 0%)
  adding: content/output/streamed_2021_state/part-00000-2838af3c-6df5-4021-87d8-d4fddbd11665-c000.csv (deflated 80%)
  adding: content/output/streamed_2021_state/_SUCCESS (stored 0%)
  adding: content/output/streamed_2021_state/.part-00000-2838af3c-6df5-4021-87d8-d4fddbd11665-c000.csv.crc (stored 0%)


In [39]:
from google.colab import files
files.download('/content/final_results.zip')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>