In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=27c843a8989c950ace298247d8eb744fe0680e364ec14e46c97de1b7d5352364
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a Spark session
spark = SparkSession.builder.appName("California_Customers").getOrCreate()

In [None]:
input_directory = "/content/drive/MyDrive/data-files/customers-tab-delimited/part-m-00000"
output_directory = "/content/drive/data-files/Answers/Scenario_1"
output_file = "cal_customers.csv"


In [None]:
#Schema
customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("customer_lname", StringType(), True),
    StructField("customer_email", StringType(), True),
    StructField("customer_password", StringType(), True),
    StructField("customer_street", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_zipcode", StringType(), True)
])


# Load data from HDFS
data = spark.read.option("delimiter", "\t").csv(input_directory,schema=customer_schema)

# Filter customers from California
california_customers = data.filter(data.customer_state == "CA")
california_customers.show(10)

+-----------+--------------+--------------+--------------+-----------------+--------------------+--------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street| customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+--------------+--------------+----------------+
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|    San Marcos|            CA|           92069|
|         14|     Katherine|         Smith|     XXXXXXXXX|        XXXXXXXXX|5666 Hazy Pony Sq...|   Pico Rivera|            CA|           90660|
|         15|          Jane|          Luna|     XXXXXXXXX|        XXXXXXXXX|    673 Burning Glen|       Fontana|            CA|           92336|
|         18|        Robert|         Smith|     XXXXXXXXX|        XXXXXXXXX|2734 Hazy Butterf...|      Martinez|            CA|   

In [None]:
data.show(10)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

In [None]:
# Extract full names
california_customers = california_customers.withColumn("full_name", concat_ws(" ", california_customers["customer_fname"], california_customers["customer_lname"]))
california_customers.show(10)

+-----------+--------------+--------------+--------------+-----------------+--------------------+--------------+--------------+----------------+---------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street| customer_city|customer_state|customer_zipcode|      full_name|
+-----------+--------------+--------------+--------------+-----------------+--------------------+--------------+--------------+----------------+---------------+
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|    San Marcos|            CA|           92069|     Mary Jones|
|         14|     Katherine|         Smith|     XXXXXXXXX|        XXXXXXXXX|5666 Hazy Pony Sq...|   Pico Rivera|            CA|           90660|Katherine Smith|
|         15|          Jane|          Luna|     XXXXXXXXX|        XXXXXXXXX|    673 Burning Glen|       Fontana|            CA|           92336|      Jane Luna|
|         18|        Robert|      

In [None]:
Final_results = california_customers.drop("customer_fname", "customer_lname")

In [None]:
sorting_columns = ["customer_id", "full_name", "customer_email", "customer_password","customer_street", "customer_city", "customer_state", "customer_zipcode"]
Final_results = california_customers.select(*sorting_columns)
Final_results.show(5)

+-----------+---------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|      full_name|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+---------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          4|     Mary Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|           92069|
|         14|Katherine Smith|     XXXXXXXXX|        XXXXXXXXX|5666 Hazy Pony Sq...|  Pico Rivera|            CA|           90660|
|         15|      Jane Luna|     XXXXXXXXX|        XXXXXXXXX|    673 Burning Glen|      Fontana|            CA|           92336|
|         18|   Robert Smith|     XXXXXXXXX|        XXXXXXXXX|2734 Hazy Butterf...|     Martinez|            CA|           94553|
|         35|Margaret Wright|     XXXXXXXXX|        XXXXXXXXX|   9456 Sleepy Jetty|    Oce

In [None]:
Final_results
# Save the DataFrame as a CSV file
output_path="/content/drive/MyDrive/data-files/Answers/Scenario_1/Results.csv"
Final_results.write.csv(output_path, header=True, mode="overwrite")

print(f"DataFrame saved as CSV at: {output_path}")


DataFrame saved as CSV at: /content/drive/MyDrive/data-files/Answers/Scenario_1/Results.csv


In [None]:
input_path = "/content/drive/MyDrive/data-files/orders_parquet/741ca897-c70e-4633-b352-5dc3414c5680.parquet"
output_path = "/content/drive/MyDrive/data-files/Answers/Scenario_2/Results.csv"

# Read the parquet data
orders = spark.read.parquet(input_path)


In [None]:
orders.show(10)

+--------+-------------+-----------------+---------------+
|order_id|   order_date|order_customer_id|   order_status|
+--------+-------------+-----------------+---------------+
|       1|1374710400000|            11599|         CLOSED|
|       2|1374710400000|              256|PENDING_PAYMENT|
|       3|1374710400000|            12111|       COMPLETE|
|       4|1374710400000|             8827|         CLOSED|
|       5|1374710400000|            11318|       COMPLETE|
|       6|1374710400000|             7130|       COMPLETE|
|       7|1374710400000|             4530|       COMPLETE|
|       8|1374710400000|             2911|     PROCESSING|
|       9|1374710400000|             5657|PENDING_PAYMENT|
|      10|1374710400000|             5648|PENDING_PAYMENT|
+--------+-------------+-----------------+---------------+
only showing top 10 rows



In [None]:
# Filter orders with order status "COMPLETE"
completed_orders = orders.filter(col("order_status") == "COMPLETE")
completed_orders.show(10)

+--------+-------------+-----------------+------------+
|order_id|   order_date|order_customer_id|order_status|
+--------+-------------+-----------------+------------+
|       3|1374710400000|            12111|    COMPLETE|
|       5|1374710400000|            11318|    COMPLETE|
|       6|1374710400000|             7130|    COMPLETE|
|       7|1374710400000|             4530|    COMPLETE|
|      15|1374710400000|             2568|    COMPLETE|
|      17|1374710400000|             2667|    COMPLETE|
|      22|1374710400000|              333|    COMPLETE|
|      26|1374710400000|             7562|    COMPLETE|
|      28|1374710400000|              656|    COMPLETE|
|      32|1374710400000|             3960|    COMPLETE|
+--------+-------------+-----------------+------------+
only showing top 10 rows



In [None]:
# Include order number, order date and current situation
include_columns = ["order_id", "order_date", "order_status"]
filtered_orders_selected = completed_orders.select(*include_columns)
filtered_orders_selected.show(10)

+--------+-------------+------------+
|order_id|   order_date|order_status|
+--------+-------------+------------+
|       3|1374710400000|    COMPLETE|
|       5|1374710400000|    COMPLETE|
|       6|1374710400000|    COMPLETE|
|       7|1374710400000|    COMPLETE|
|      15|1374710400000|    COMPLETE|
|      17|1374710400000|    COMPLETE|
|      22|1374710400000|    COMPLETE|
|      26|1374710400000|    COMPLETE|
|      28|1374710400000|    COMPLETE|
|      32|1374710400000|    COMPLETE|
+--------+-------------+------------+
only showing top 10 rows



In [None]:
# Save the filtered data as a parquet file
filtered_orders_selected.write.csv(output_path, mode="overwrite")
print(f"Filtered data saved to: {output_path}")

Filtered data saved to: /content/drive/MyDrive/data-files/Answers/Scenario_2/Results.csv


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Schema
customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("customer_lname", StringType(), True),
    StructField("customer_email", StringType(), True),
    StructField("customer_password", StringType(), True),
    StructField("customer_street", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("customer_zipcode", StringType(), True)
])

# Load data from HDFS
data = spark.read.option("delimiter", "\t").csv(input_directory,schema=customer_schema)
data.show(10)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

In [None]:
# Filter customers who live in the city of "Caguas"
Cagus_customers = data.filter(data["customer_city"] == "Caguas")

# Define paths
output_path = "/content/drive/MyDrive/data-files/Answers/Scenario_3/Result.parquet"

# Save the filtered data as a parquet file
Cagus_customers.write.parquet(output_path, mode="overwrite")

Cagus_customers.show(10)

print(f"Filtered data saved to: {output_path}")

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          5|        Robert|        Hudson|     XXXXXXXXX|        XXXXXXXXX|10 Crystal River ...|       Caguas|            PR|           00725|
|          7|       Melissa|        Wilcox|     XXXXXXXXX|        XXXXXXXXX|9453 High Concession|       Caguas|            PR|           00725|
|          9|          Mary|         Perez|     XXXXXXXXX|        XXXXXXXXX| 3616 Quaking Street|       Caguas|            PR|          

In [None]:
Cagus_customers.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          5|        Robert|        Hudson|     XXXXXXXXX|        XXXXXXXXX|10 Crystal River ...|       Caguas|            PR|           00725|
|          7|       Melissa|        Wilcox|     XXXXXXXXX|        XXXXXXXXX|9453 High Concession|       Caguas|            PR|           00725|
|          9|          Mary|         Perez|     XXXXXXXXX|        XXXXXXXXX| 3616 Quaking Street|       Caguas|            PR|          

In [None]:
input_path = "/content/drive/MyDrive/data-files/categories"
output_path = "/content/drive/MyDrive/data-files/Answers/Scenario_4/Results.csv"

# Reading the data
categories_df = spark.read.csv(input_path ,header =True)
categories_df.show()

+---+---+-------------------+
|  1|  2|           Football|
+---+---+-------------------+
|  2|  2|             Soccer|
|  3|  2|Baseball & Softball|
|  4|  2|         Basketball|
|  5|  2|           Lacrosse|
|  6|  2|   Tennis & Racquet|
|  7|  2|             Hockey|
|  8|  2|        More Sports|
|  9|  3|   Cardio Equipment|
| 10|  3|  Strength Training|
| 11|  3|Fitness Accessories|
| 12|  3|       Boxing & MMA|
| 13|  3|        Electronics|
| 14|  3|     Yoga & Pilates|
| 15|  3|  Training by Sport|
| 16|  3|    As Seen on  TV!|
| 17|  4|             Cleats|
| 18|  4|     Men's Footwear|
| 19|  4|   Women's Footwear|
| 20|  4|     Kids' Footwear|
| 21|  4|     Featured Shops|
+---+---+-------------------+
only showing top 20 rows



In [None]:
categories_df.write.csv(output_path, mode="overwrite")

print(f"Filtered data saved to: {output_path}")


input_path = "/content/drive/MyDrive/data-files/Answers/Scenario_4/Results.csv"
category_df = spark.read \
    .option("header", "true") \
    .csv(input_path)

output1_path = "/content/drive/MyDrive/data-files/Answers/Scenario_4/result1ls4compressed.csv"
category_df.write \
    .option("compression", "lz4") \
    .option("header", "true") \
    .csv(output1_path)

Filtered data saved to: /content/drive/MyDrive/data-files/Answers/Scenario_4/Results.csv


In [None]:
#scenario- 7

input_path = "/content/drive/MyDrive/data-files/orders_parquet/741ca897-c70e-4633-b352-5dc3414c5680.parquet"
output_path = "/content/drive/MyDrive/data-files/Answers/Scenario_7/Results.parquet"

# Read the Parquet data
orders_df = spark.read.parquet(input_path)

orders_df.show(10)


+--------+-------------+-----------------+---------------+
|order_id|   order_date|order_customer_id|   order_status|
+--------+-------------+-----------------+---------------+
|       1|1374710400000|            11599|         CLOSED|
|       2|1374710400000|              256|PENDING_PAYMENT|
|       3|1374710400000|            12111|       COMPLETE|
|       4|1374710400000|             8827|         CLOSED|
|       5|1374710400000|            11318|       COMPLETE|
|       6|1374710400000|             7130|       COMPLETE|
|       7|1374710400000|             4530|       COMPLETE|
|       8|1374710400000|             2911|     PROCESSING|
|       9|1374710400000|             5657|PENDING_PAYMENT|
|      10|1374710400000|             5648|PENDING_PAYMENT|
+--------+-------------+-----------------+---------------+
only showing top 10 rows



In [None]:
# Filter PENDING orders in July 2013
filtered_orders = orders_df.filter((col("order_status") == "PENDING_PAYMENT") &
                                   (year(from_unixtime(col("order_date") / 1000)) == 2013) &
                                   (month(from_unixtime(col("order_date") / 1000)) == 7))
filtered_orders.show(1000)




+--------+-------------+-----------------+---------------+
|order_id|   order_date|order_customer_id|   order_status|
+--------+-------------+-----------------+---------------+
|       2|1374710400000|              256|PENDING_PAYMENT|
|       9|1374710400000|             5657|PENDING_PAYMENT|
|      10|1374710400000|             5648|PENDING_PAYMENT|
|      13|1374710400000|             9149|PENDING_PAYMENT|
|      16|1374710400000|             7276|PENDING_PAYMENT|
|      19|1374710400000|             9488|PENDING_PAYMENT|
|      23|1374710400000|             4367|PENDING_PAYMENT|
|      27|1374710400000|             3241|PENDING_PAYMENT|
|      30|1374710400000|            10039|PENDING_PAYMENT|
|      33|1374710400000|             5793|PENDING_PAYMENT|
|      40|1374710400000|            12092|PENDING_PAYMENT|
|      41|1374710400000|             8136|PENDING_PAYMENT|
|      43|1374710400000|             7776|PENDING_PAYMENT|
|      47|1374710400000|             8487|PENDING_PAYMEN

In [None]:
# Select relevant columns and format order date
organizing_columns = ["order_id", "order_date", "order_customer_id", "order_status"]
calendar_format = filtered_orders.select(*organizing_columns) \
                                  .withColumn("order_date", date_format(from_unixtime(col("order_date") / 1000), "yyyy-MM-dd"))

# Show the DataFrame
calendar_format.show()




+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       2|2013-07-25|              256|PENDING_PAYMENT|
|       9|2013-07-25|             5657|PENDING_PAYMENT|
|      10|2013-07-25|             5648|PENDING_PAYMENT|
|      13|2013-07-25|             9149|PENDING_PAYMENT|
|      16|2013-07-25|             7276|PENDING_PAYMENT|
|      19|2013-07-25|             9488|PENDING_PAYMENT|
|      23|2013-07-25|             4367|PENDING_PAYMENT|
|      27|2013-07-25|             3241|PENDING_PAYMENT|
|      30|2013-07-25|            10039|PENDING_PAYMENT|
|      33|2013-07-25|             5793|PENDING_PAYMENT|
|      40|2013-07-25|            12092|PENDING_PAYMENT|
|      41|2013-07-25|             8136|PENDING_PAYMENT|
|      43|2013-07-25|             7776|PENDING_PAYMENT|
|      47|2013-07-25|             8487|PENDING_PAYMENT|
|      52|2013-07-25|             5126|PENDING_P

In [None]:
# Save the filtered data as a Parquet file
calendar_format.write.parquet(output_path, mode="overwrite")

print(f"Filtered data saved to: {output_path}")

Filtered data saved to: /content/drive/MyDrive/data-files/Answers/Scenario_7/Results.parquet


In [None]:
input_path = "/content/drive/MyDrive/data-files/products_avro/products.csv"
df = spark.read.csv(input_path, header=True, inferSchema=True)
df.show(1000)


+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|      1009|                 45|Diamond Fear No E...|               null|       599.99|http://images.acm...|
|      1010|                 46|DBX Vector Series...|               null|        19.98|http://images.acm...|
|      1011|                 46|Old Town Canoe Sa...|               null|       499.99|http://images.acm...|
|      1012|                 46|Pelican Trailblaz...|               null|       299.99|http://images.acm...|
|      1013|                 46|Perception Sport ...|               null|       349.99|http://images.acm...|
|      1014|                 46|O'Brien Men's Neo...|               null|        49.98|http://images.acm...|
|      1015|       

In [None]:
# Filter products with price greater than 1000.0
MoreThan100_df = df.filter(col("product_price") > 1000.0)

# Define the output directory
output_path = "/content/drive/MyDrive/data-files/Answers/Scenario_5/Results.csv"

# Save the filtered DataFrame as CSV
MoreThan100_df.write.csv(output_path, header=True, mode="overwrite")


In [None]:
MoreThan100_df.show()

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|      1048|                 47|"Spalding Beast 6...|               null|      1099.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+



In [None]:
#scenario 6

In [None]:
input_path = "/content/drive/MyDrive/data-files/products_avro/products.csv"
output_path = "/content/drive/MyDrive/data-files/Answers/Scenario_6/Result"

# Read the CSV data
only_df = spark.read.option("header", "true").csv(input_path)
only_df.show(1000)


+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|      1009|                 45|Diamond Fear No E...|               null|       599.99|http://images.acm...|
|      1010|                 46|DBX Vector Series...|               null|        19.98|http://images.acm...|
|      1011|                 46|Old Town Canoe Sa...|               null|       499.99|http://images.acm...|
|      1012|                 46|Pelican Trailblaz...|               null|       299.99|http://images.acm...|
|      1013|                 46|Perception Sport ...|               null|       349.99|http://images.acm...|
|      1014|                 46|O'Brien Men's Neo...|               null|        49.98|http://images.acm...|
|      1015|       

In [None]:
# Filter products with a price greater than 1000.0 and containing "Treadmill" in product name
only_products = only_df.filter((col("product_price") > 1000.0) &col("product_name").contains("Treadmill"))
only_products.show()



+----------+-------------------+------------+-------------------+-------------+-------------+
|product_id|product_category_id|product_name|product_description|product_price|product_image|
+----------+-------------------+------------+-------------------+-------------+-------------+
+----------+-------------------+------------+-------------------+-------------+-------------+



In [None]:
# Save the filtered data as a CSV file
only_products.write.csv(output_path, mode="overwrite", header=True)

print(f"Filtered data saved to: {output_path}")

Filtered data saved to: /content/drive/MyDrive/data-files/Answers/Scenario_6/Result
