In [2]:
!python --version

Python 3.10.12


In [3]:
!pwd

/content


In [4]:

!wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

--2024-03-21 18:56:10--  https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400446614 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.1-bin-hadoop3.tgz’


2024-03-21 18:56:12 (183 MB/s) - ‘spark-3.5.1-bin-hadoop3.tgz’ saved [400446614/400446614]



In [5]:
!pyspark

/bin/bash: line 1: pyspark: command not found


In [None]:
!tar -xvzf spark-3.5.1-bin-hadoop3.tgz

In [7]:
!ls/content/spark-3.5.1-bin-hadoop3

/bin/bash: line 1: ls/content/spark-3.5.1-bin-hadoop3: No such file or directory


In [8]:
!sudo ls /content/spark-3.5.1-bin-hadoop3
!pip install findspark

bin   data	jars	    LICENSE   NOTICE  R		 RELEASE  yarn
conf  examples	kubernetes  licenses  python  README.md  sbin
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [9]:
import os
os.environ["SPARK_HOME"]= "/content/spark-3.5.1-bin-hadoop3"

import findspark
findspark.init()

In [10]:
from pyspark.sql import SparkSession


In [11]:


# Initialize SparkSession
spark = SparkSession.builder \
    .appName("CSV-to-Hive Schema Matching") \
    .getOrCreate()

# Define the schema of the Hive table
hive_table_schema = "id INT, name STRING, age INT"  # Example schema

# Read the first few rows of the CSV file to infer its schema
csv_file_path = "/content/Top250.csv"  # Specify the path to your CSV file
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Print the inferred schema of the CSV file
print("Schema of the CSV file:")
df.printSchema()

# Compare the inferred schema with the schema of the Hive table
if df.schema.simpleString() == hive_table_schema:
    print("CSV schema matches the schema of the Hive table.")
else:
    print("CSV schema does not match the schema of the Hive table.")

# Stop the SparkSession
spark.stop()


Schema of the CSV file:
root
 |-- Rank: integer (nullable = true)
 |-- Restaurant: string (nullable = true)
 |-- Content: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- YOY_Sales: string (nullable = true)
 |-- Units: string (nullable = true)
 |-- YOY_Units: string (nullable = true)
 |-- Headquarters: string (nullable = true)
 |-- Segment_Category: string (nullable = true)

CSV schema does not match the schema of the Hive table.


In [12]:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Create Hive Tables from CSV") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

# Define schemas for each CSV file
schema_csv1 = StructType([
    StructField("Rank", IntegerType(), True),
    StructField("Restaurant", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Sales", IntegerType(), True),
    StructField("YOY_Sales", StringType(), True),
    StructField("Units", IntegerType(), True),
    StructField("YOY_Units", StringType(), True),
    StructField("Unit_Volume", IntegerType(), True),
    StructField("Franchising", StringType(), True)
])

schema_csv2 = StructType([
    StructField("Rank", IntegerType(), True),
    StructField("Restaurant", StringType(), True),
    StructField("Sales", DoubleType(), True),
    StructField("Average Check", IntegerType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Meals Served", DoubleType(), True)
])

schema_csv3 = StructType([
    StructField("Rank", IntegerType(), True),
    StructField("Restaurant", StringType(), True),
    StructField("Content", StringType(), True),
    StructField("Sales", StringType(), True),
    StructField("YOY_Sales", StringType(), True),
    StructField("Units", StringType(), True),
    StructField("YOY_Units", StringType(), True),
    StructField("Headquarters", StringType(), True),
    StructField("Segment_Category", StringType(), True)
])

# Load data from CSV files into DataFrames
csv1_path = "/content/Future50.csv"  # Path to CSV file 1
csv2_path = "/content/Independence100.csv"  # Path to CSV file 2
csv3_path = "/content/Top250.csv"  # Path to CSV file 3

df_csv1 = spark.read.csv(csv1_path, header=True, schema=schema_csv1)
df_csv2 = spark.read.csv(csv2_path, header=True, schema=schema_csv2)
df_csv3 = spark.read.csv(csv3_path, header=True, schema=schema_csv3)

# Save DataFrames as Hive tables
df_csv1.write.mode("overwrite").saveAsTable("table_csv1")
df_csv2.write.mode("overwrite").saveAsTable("table_csv2")
df_csv3.write.mode("overwrite").saveAsTable("table_csv3")

# Stop the SparkSession
spark.stop()


In [18]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Query Hive Table") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()


In [19]:
# Query 1: Retrieve the first 10 rows from table_csv1 with the highest sales
query_1 = """
    SELECT *
    FROM (
        SELECT *, ROW_NUMBER() OVER (ORDER BY Sales DESC) AS rn
        FROM table_csv1
    ) t
    WHERE rn <= 10
"""
result_1 = spark.sql(query_1)
result_1.show()

+----+--------------------+--------------------+-----+---------+-----+---------+-----------+-----------+---+
|Rank|          Restaurant|            Location|Sales|YOY_Sales|Units|YOY_Units|Unit_Volume|Franchising| rn|
+----+--------------------+--------------------+-----+---------+-----+---------+-----------+-----------+---+
|   5|           Pokeworks|      Irvine, Calif.|   49|    77.1%|   50|    56.3%|       1210|        Yes|  1|
|  41|Blue Sushi Sake G...|         Omaha, Neb.|   49|    19.5%|   14|    16.7%|       3500|         No|  2|
|  14|      Bluestone Lane|      New York, N.Y.|   48|    33.0%|   48|    37.1%|       1175|         No|  3|
|  46|         LA Crawfish|      McAllen, Texas|   48|    17.6%|   25|    13.6%|       2050|        Yes|  4|
|  24|     Joe & The Juice|      New York, N.Y.|   47|    25.9%|   69|    25.5%|        760|        Yes|  5|
|  42|      The Human Bean|       Medford, Ore.|   47|    19.0%|   97|    19.8%|        535|        Yes|  6|
|  47|             

In [20]:
# Query 2: Count the total number of rows in table_csv2 and table_csv3 and calculate their sum
query_2 = """
    SELECT 'table_csv2' AS Table, COUNT(*) AS Row_Count FROM table_csv2
    UNION ALL
    SELECT 'table_csv3' AS Table, COUNT(*) AS Row_Count FROM table_csv3
"""
result_2 = spark.sql(query_2)
result_2.show()

+----------+---------+
|     Table|Row_Count|
+----------+---------+
|table_csv2|      100|
|table_csv3|      250|
+----------+---------+



In [21]:
# Query 3: Retrieve unique cities from table_csv2 along with their total sales
query_3 = """
    SELECT City, COUNT(*) AS Total_Restaurants, SUM(Sales) AS Total_Sales
    FROM table_csv2
    GROUP BY City
    ORDER BY Total_Sales DESC
"""
result_3 = spark.sql(query_3)
result_3.show()

+--------------+-----------------+------------+
|          City|Total_Restaurants| Total_Sales|
+--------------+-----------------+------------+
|      New York|               21|4.06473807E8|
|       Chicago|               15|2.68481978E8|
|     Las Vegas|               11|2.05296684E8|
|    Washington|                9|1.61413973E8|
| San Francisco|                5| 6.7681136E7|
|      Orlando |                2| 5.5047864E7|
|         Miami|                3| 5.4481741E7|
|Ft. Lauderdale|                2| 3.4301433E7|
|  Indianapolis|                2| 3.4232062E7|
|   Frankenmuth|                2| 3.3452435E7|
|       Atlanta|                2|  3.278875E7|
|     Nashville|                2| 2.7463743E7|
|       Raleigh|                1|  2.426816E7|
|   Miami Beach|                1|      2.38E7|
|     Oak Brook|                1| 1.9831818E7|
|       Houston|                1| 1.9530159E7|
|  Philadelphia|                1| 1.9379153E7|
|      Wheeling|                1| 1.868

In [41]:
# Query 4: Analyze the distribution of restaurant sales by month from table_csv1
query4_result = spark.sql("""
    SELECT Month, AVG(Sales) AS Avg_Sales
    FROM (
        SELECT SUBSTRING_INDEX(Location, '-', -1) AS Month, Sales
        FROM table_csv1
    ) t
    GROUP BY Month
    ORDER BY Month
""")
query4_result.show()

+--------------------+------------------+
|               Month|         Avg_Sales|
+--------------------+------------------+
|Agoura Hills, Calif.|              31.0|
|     Anaheim, Calif.|              27.0|
|        Atlanta, Ga.|              29.0|
|       Belmar,  N.J.|              39.0|
|      Blue Bell, Pa.|              24.0|
|     Charlotte, N.C.|              42.0|
|      Columbus, Ohio|41.333333333333336|
|        Conway, Ark.|              25.0|
|       Denver, Colo.|              41.0|
|         Doral, Fla.|              32.0|
|        Douglas, Ga.|              22.0|
|       Fairburn, Ga.|              38.0|
|        Fairfax, Va.|              22.0|
|       Frisco, Texas|              20.0|
|Huntington Beach,...|              21.0|
|      Irvine, Calif.|              49.0|
|     Kettering, Ohio|              24.0|
| Los Angeles, Calif.|              28.0|
|     Louisville, Ky.|              29.0|
|      McAllen, Texas|              48.0|
+--------------------+------------

In [40]:
# Query 5: Determine the top 3 restaurant chains with the highest average sales per unit from table_csv1
query5_result = spark.sql("""
    SELECT Restaurant, AVG(Sales / Units) AS Avg_Sales_Per_Unit
    FROM table_csv1
    GROUP BY Restaurant
    ORDER BY Avg_Sales_Per_Unit DESC
    LIMIT 3
""")
query5_result.show()

+--------------------+------------------+
|          Restaurant|Avg_Sales_Per_Unit|
+--------------------+------------------+
|     Bulla Gastrobar|               4.0|
|            Boqueria| 3.857142857142857|
|Blue Sushi Sake G...|               3.5|
+--------------------+------------------+



In [46]:
# Query 6: Calculate the average sales growth rate per restaurant from table_csv3
query6_result = spark.sql("""
    SELECT Restaurant, AVG(CAST(REGEXP_REPLACE(YOY_Sales, '%', '') AS DOUBLE)) AS Avg_Sales_Growth_Rate
    FROM table_csv3
    GROUP BY Restaurant
""")
query6_result.show()

+--------------------+---------------------+
|          Restaurant|Avg_Sales_Growth_Rate|
+--------------------+---------------------+
|          Applebee's|                 -3.0|
|    McAlister's Deli|                  7.0|
|       Uncle Julio's|                 11.7|
|       Buca di Beppo|                 -5.7|
|  The Capital Grille|                  4.7|
|            Cinnabon|                  5.6|
|   Wetzel's Pretzels|                  3.9|
|          Bojangles'|                  2.7|
|Tropical Smoothie...|                 21.8|
|         Red Lobster|                  1.6|
|         O'Charley's|                 -7.6|
|         Smashburger|                 -6.5|
|Old Country Buffe...|                -18.8|
|Captain D's Seafo...|                  3.1|
|         Sonny's BBQ|                 -8.3|
|  Buffalo Wild Wings|                 -0.1|
|Islands Fine Burg...|                  0.9|
|       Pret A Manger|                  4.7|
|          Giordano's|                  5.5|
|Checkers 

In [30]:
# Query 6: Retrieve restaurants with 'YOY_Sales' indicating growth and their corresponding segments from table_csv3
query_6 = """
    SELECT t1.Restaurant, t1.Segment_Category
    FROM table_csv3 t1
    JOIN (
        SELECT DISTINCT Restaurant
        FROM table_csv3
        WHERE YOY_Sales LIKE '%growth%'
    ) t2 ON t1.Restaurant = t2.Restaurant
"""
result_6 = spark.sql(query_6)
result_6.show(10)

+----------+----------------+
|Restaurant|Segment_Category|
+----------+----------------+
+----------+----------------+



In [37]:
# Query 7: Find the restaurants with the highest increase in sales compared to the previous year in table_csv3
query7_result = spark.sql("""
    SELECT Restaurant, (Sales - PREVIOUS_YEAR_SALES) AS Sales_Increase
    FROM (
        SELECT Restaurant, CAST(SUBSTRING(YOY_Sales, 1, LENGTH(YOY_Sales) - 1) AS FLOAT) / 100 * Sales AS PREVIOUS_YEAR_SALES, Sales
        FROM table_csv3
    ) t
    ORDER BY Sales_Increase DESC
    LIMIT 10
""")
query7_result.show()


+------------+-----------------+
|  Restaurant|   Sales_Increase|
+------------+-----------------+
|  McDonald's|38431.81196146012|
|   Starbucks|19541.31991844177|
|      Subway|          10404.0|
|   Taco Bell|         10276.63|
| Burger King|9928.491995134355|
| Chick-fil-A|           9848.4|
|     Wendy's|9351.996018619537|
|    Domino's|6557.963993282318|
|Panera Bread|           5654.4|
|   Pizza Hut| 5524.65199867487|
+------------+-----------------+



In [32]:
# Query 8: Retrieve restaurants with sales greater than the average sales of the previous year in table_csv2
avg_sales_last_year = spark.sql("SELECT AVG(Sales) AS Avg_Sales_Last_Year FROM table_csv2").collect()[0][0]
query_8 = f"""
    SELECT Restaurant
    FROM table_csv2
    WHERE Sales > {avg_sales_last_year}
"""
result_8 = spark.sql(query_8)
result_8.show()

+--------------------+
|          Restaurant|
+--------------------+
|Carmine's (Times ...|
|The Boathouse Orl...|
|    Old Ebbitt Grill|
|LAVO Italian Rest...|
|Bryant Park Grill...|
|Gibsons Bar & Ste...|
|Top of the World ...|
|         Maple & Ash|
|           Balthazar|
|   Smith & Wollensky|
|          Angus Barn|
|           Prime 112|
|Joe's Seafood, Pr...|
|Junior's (Times S...|
|        The Hamilton|
|Joe's Seafood, Pr...|
|Joe's Seafood, Pr...|
|      Gibsons Italia|
|              Komodo|
|            Buddakan|
+--------------------+
only showing top 20 rows



In [35]:
# Query 9: Retrieve top 5 restaurants with the highest sales from table_csv1 along with their units sold
query_9 = """
    SELECT t1.Restaurant, t1.Sales, t1.Units
    FROM table_csv1 t1
    JOIN (
        SELECT Restaurant, Sales
        FROM table_csv1
        ORDER BY Sales DESC
        LIMIT 5
    ) t2 ON t1.Restaurant = t2.Restaurant
"""
result_9 = spark.sql(query_9)
result_9.show()


+--------------------+-----+-----+
|          Restaurant|Sales|Units|
+--------------------+-----+-----+
|           Pokeworks|   49|   50|
|      Bluestone Lane|   48|   48|
|     Joe & The Juice|   47|   69|
|Blue Sushi Sake G...|   49|   14|
|         LA Crawfish|   48|   25|
+--------------------+-----+-----+



In [36]:

# Query 10: Retrieve restaurants with 'Franchising' status from table_csv1 and their corresponding locations
query_10 = """
    SELECT t1.Restaurant, t1.Location
    FROM table_csv1 t1
    JOIN (
        SELECT DISTINCT Restaurant
        FROM table_csv1
        WHERE Franchising IS NOT NULL
    ) t2 ON t1.Restaurant = t2.Restaurant
"""
result_10 = spark.sql(query_10)
result_10.show()


+--------------------+--------------------+
|          Restaurant|            Location|
+--------------------+--------------------+
|          Evergreens|      Seattle, Wash.|
|         Clean Juice|     Charlotte, N.C.|
|            Slapfish|Huntington Beach,...|
|          Clean Eatz|    Wilmington, N.C.|
|           Pokeworks|      Irvine, Calif.|
|         Playa Bowls|       Belmar,  N.J.|
|    The Simple Greek|      Blue Bell, Pa.|
|           Melt Shop|      New York, N.Y.|
|          Creamistry|Yorba Linda,  Calif.|
|Joella's Hot Chicken|     Louisville, Ky.|
|       Eggs Up Grill|   Spartanburg, S.C.|
|            Dog Haus|    Pasadena, Calif.|
|    Teriyaki Madness|       Denver, Colo.|
|      Bluestone Lane|      New York, N.Y.|
|   Original ChopShop|        Plano, Texas|
|   Rapid Fired Pizza|     Kettering, Ohio|
|Ike's Love & Sand...|San Francisco, Ca...|
|      Vitality Bowls|   San Ramon, Calif.|
|Hawkers Asian Str...|       Orlando, Fla.|
|Maple Street Bisc...|   Orange 