In [0]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
spark = SparkSession.builder \
    .appName("iPhone Store") \
    .enableHiveSupport() \
    .getOrCreate()

In [0]:
sales_file_path = "/FileStore/FileStore/sales_data-9.txt"
product_file_path = "/FileStore/FileStore/product_data-6.txt"


**Sales Collector API**

In [0]:
def sales_data_collector_api(spark, sales_file_path):
    # Step 1: Load Sales Data from the Text File with Proper Header Handling
    sales_df_raw = spark.read.option("delimiter", "|") \
        .option("header", "true") \
        .csv(sales_file_path)

    # Check distinct sale_date values to confirm the format
    sales_df_raw.select("sale_date").distinct().show()

    # Step 2: Filter Out Any Invalid Header Row (if necessary)
    sales_df = sales_df_raw.filter("seller_id IS NOT NULL AND sale_date IS NOT NULL")

    # Step 3: Cast Columns to Correct Data Types
    sales_df = sales_df.select(
        col("seller_id").cast("int"),
        col("product_id").cast("int"),
        col("buyer_id").cast("int"),
        to_date(col("sale_date"), "yyyy-MM-dd").alias("sale_date"),  # Correctly cast sale_date
        col("quantity").cast("int"),
        col("price").cast("int")
    )

    # Return the final DataFrame
    return sales_df


# Call the function to collect sales data
sales_df = sales_data_collector_api(spark, sales_file_path)

# Save sales DataFrame as a Hive table in Parquet format
sales_df.write.format("parquet").mode("overwrite").saveAsTable("sales_table_parquet")

# Query the Hive Table to Confirm Data is Written
result_df = spark.sql("SELECT * FROM sales_table_parquet")
result_df.show()

+----------+
| sale_date|
+----------+
|2019-05-13|
|2019-06-02|
|2019-01-21|
|2019-02-17|
+----------+

+---------+----------+--------+----------+--------+-----+
|seller_id|product_id|buyer_id| sale_date|quantity|price|
+---------+----------+--------+----------+--------+-----+
|        1|         1|       1|2019-01-21|       2| 2000|
|        1|         2|       2|2019-02-17|       1|  800|
|        2|         1|       3|2019-06-02|       1|  800|
|        3|         3|       3|2019-05-13|       2| 2800|
+---------+----------+--------+----------+--------+-----+



**'''The sales_data_collector_api function loads sales data from a text file, specifying the delimiter as | and that the first row contains headers. It then filters out rows with NULL values in critical fields like seller_id and sale_date to ensure data integrity. The function proceeds to cast the columns to their appropriate data types, converting sale_date to a date format and casting the numeric fields (seller_id, product_id, buyer_id, quantity, and price) to integers. After the data is cleaned and transformed, the resulting DataFrame is saved to a Hive table named sales_table_parquet in Parquet format, overwriting any existing table. Finally, a query is run to confirm the data has been written correctly, displaying the content of the table. This process ensures that sales data is properly structured and stored in a Hive table for further analysis.
'''**

**Product Collector API**

In [0]:
def product_data_collector_api(spark, product_file_path):
    # Step 1: Load Product Data from the CSV File and assign column names
    product_df = spark.read.option("delimiter", "|").csv(product_file_path, header=True) 
    
    # Step 2: Cast Columns to Correct Data Types
    product_df = product_df.select(
        F.col("product_id").cast("int"),  
        F.col("product_name").cast("string"),  
        F.col("unit_price").cast("int") 
    )

    # Step 3: Write the Data to the Hive Table in Parquet format
    product_df.write.format("parquet").mode("overwrite").saveAsTable("product_table_parquet")


# Call the function to collect product data
product_data_collector_api(spark, product_file_path)

# Step 4: Query the Hive Table to Confirm Data is Written
result_df = spark.sql("SELECT * FROM product_table_parquet")
result_df.show()



+----------+------------+----------+
|product_id|product_name|unit_price|
+----------+------------+----------+
|         1|          S8|      1000|
|         2|          G4|       800|
|         3|      iPhone|      1400|
+----------+------------+----------+



**'''
The product_data_collector_api function loads product data from a CSV file, specifying `|` as the delimiter and treating the first row as headers. It then casts the columns to the appropriate data types: product_id is converted to an integer, product_name is cast to a string, and unit_price is also cast to an integer. After processing and ensuring the data is properly formatted, the resulting DataFrame is written to a Hive table named product_table_parquet in Parquet format, with the option to overwrite any existing data in the table. Finally, the function queries the Hive table to confirm that the data has been successfully written, displaying the table's content for verification. This process ensures that the product data is stored in a structured, queryable format in Hive for further analysis.
'''**

**Data Preparation API**

In [0]:
def get_buyers_s8_not_iphone(spark, product_table_parquet, sales_table_parquet, target_table):
    # Step 1: Load Data from Product and Sales Hive Tables
    product_df = spark.sql(f"SELECT * FROM {product_table_parquet}")
    sales_df = spark.sql(f"SELECT * FROM {sales_table_parquet}")

    # Step 2: Get Product IDs for 'S8' and 'iPhone'
    s8_product_row = product_df.filter("product_name = 'S8'").select("product_id").first()
    iphone_product_row = product_df.filter("product_name = 'iPhone'").select("product_id").first()
    
    # Check if product IDs are found
    if not s8_product_row or not iphone_product_row:
        raise ValueError("One or both of the products 'S8' or 'iPhone' are not found in the product table.")
    
    s8_product_id = s8_product_row['product_id']
    iphone_product_id = iphone_product_row['product_id']
    
    # Step 3: Get Buyers for 'S8' (distinct buyer_id)
    s8_buyers = sales_df.filter(f"product_id = {s8_product_id}").select("buyer_id").distinct()
    
    # Get Buyers for 'iPhone' (distinct buyer_id)
    iphone_buyers = sales_df.filter(f"product_id = {iphone_product_id}").select("buyer_id").distinct()
    
    # Step 4: Find Buyers who bought 'S8' but not 'iPhone' (left anti join)
    buyers_s8_not_iphone = s8_buyers.join(iphone_buyers, "buyer_id", "left_anti")
    
    
    # Step 6: Write the Result to the Target Hive Table
    buyers_s8_not_iphone.write.format("parquet").mode("overwrite").saveAsTable(target_table)

# Call the function and store the result
buyers_s8_not_iphone_result = get_buyers_s8_not_iphone(spark, 'product_table_parquet', 'sales_table_parquet', 'buyers_s8_not_iphone_table') 

# Step 7: Query the Target Hive Table to Confirm Data is Written
result_df = spark.sql(f"SELECT * FROM {target_table}")
result_df.show()






+--------+
|buyer_id|
+--------+
|       1|
+--------+



**'''
The get_buyers_s8_not_iphone function loads product and sales data from Hive tables, filters the products to retrieve the IDs for 'S8' and 'iPhone', and checks if both IDs are available. It then retrieves the distinct buyer IDs for each product from the sales data. By performing a left anti join, the function identifies buyers who bought 'S8' but not 'iPhone'. The result is then written to a target Hive table in Parquet format, overwriting any existing data. Finally, the function queries the target table to confirm that the data has been successfully written and displays the result. This process is useful for analyzing customer purchasing patterns and segmenting buyers based on specific product preferences.
'''**

In [0]:
spark.sql("SHOW TABLES").show()


+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|buyers_s8_not_iph...|      false|
| default|product_table_par...|      false|
| default| sales_table_parquet|      false|
+--------+--------------------+-----------+

