In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Datacamp Pyspark Tutorial")\
.config("spark.memory.offHeap. enabled", "true") . config("spark.memory.offHeap. size", "10g") \
.getOrCreate()

In [14]:
file_path = r"C:\Users\abdel\OneDrive\Desktop\Sub\BigData\Csv. files\customers.csv"
# Read the CSV file into a DataFrame
df_customers = spark.read.csv(file_path, header=True, inferSchema=True)
df_customers.show()
df_customers .printSchema()
print("Count of dataframe:",df_customers.count())

+-----------+-------------------+-----------+---+-------------------------------+--------+-----------------+----------------------------+---------+
|customer_id|customer_name      |gender     |age|home_address                   |zip_code|city             |state                       |country  |
+-----------+-------------------+-----------+---+-------------------------------+--------+-----------------+----------------------------+---------+
|1          |Leanna Busson      |Female     |30 |8606 Victoria TerraceSuite 560 |5464    |Johnstonhaven    |Northern Territory          |Australia|
|2          |Zabrina Harrowsmith|Genderfluid|69 |8327 Kirlin SummitApt. 461     |8223    |New Zacharyfort  |South Australia             |Australia|
|3          |Shina Dullaghan    |Polygender |59 |269 Gemma SummitSuite 109      |5661    |Aliburgh         |Australian Capital Territory|Australia|
|4          |Hewet McVitie      |Bigender   |67 |743 Bailey GroveSuite 141      |1729    |South Justinhaven|Quee

In [15]:
df_customers.describe().show()

+-------+-----------------+--------------+----------+------------------+--------------------+-----------------+---------+--------------------+---------+
|summary|      customer_id| customer_name|    gender|               age|        home_address|         zip_code|     city|               state|  country|
+-------+-----------------+--------------+----------+------------------+--------------------+-----------------+---------+--------------------+---------+
|  count|             1000|          1000|      1000|              1000|                1000|             1000|     1000|                1000|     1000|
|   mean|            500.5|          null|      null|             49.86|                null|         5004.872|     null|                null|     null|
| stddev|288.8194360957494|          null|      null|17.647828360618387|                null|2884.497332027621|     null|                null|     null|
|    min|                1|Abbot Rickaert|   Agender|                20|00 Fadel C

In [16]:
df_customers.describe(["age", "zip_code"]).show()


+-------+------------------+-----------------+
|summary|               age|         zip_code|
+-------+------------------+-----------------+
|  count|              1000|             1000|
|   mean|             49.86|         5004.872|
| stddev|17.647828360618387|2884.497332027621|
|    min|                20|                2|
|    max|                80|             9998|
+-------+------------------+-----------------+



In [31]:
df_customers.groupBy("gender").count().show()
df_customers.groupBy("country").count().show()

+-----------+-----+
|     gender|count|
+-----------+-----+
|Genderqueer|  127|
|    Agender|  114|
|     Female|  115|
| Polygender|  128|
|   Bigender|  120|
| Non-binary|  131|
|       Male|  143|
|Genderfluid|  122|
+-----------+-----+

+---------+-----+
|  country|count|
+---------+-----+
|Australia| 1000|
+---------+-----+



In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Handling Missing Values
# Drop rows with any null values
df_customers_cleaned = df_customers.dropna()

# Handling Duplicates
df_customers_cleaned =df_customers_cleaned.dropDuplicates()

# Handling Outliers (Example: Removing outliers in 'age' column)
# Assuming outliers are defined as age < 0 or age > 100
df_customerscleaned =df_customers_cleaned.filter((col("age") >= 0) & (col("age") <= 100))
df_customerscleaned.show(30)

+-----------+-------------------+-----------+---+--------------------+--------+--------------------+--------------------+---------+
|customer_id|      customer_name|     gender|age|        home_address|zip_code|                city|               state|  country|
+-----------+-------------------+-----------+---+--------------------+--------+--------------------+--------------------+---------+
|        152|    Panchito Wybern|     Female| 79|03 Glover TrackAp...|    7845|       Christianport|Australian Capita...|Australia|
|        540|       Maude McCaig|Genderfluid| 75|297 Ferry LaneApt...|    8550|         East Amelia|     South Australia|Australia|
|        851|      Worthy Pardoe| Non-binary| 71|71 Joseph TrailAp...|    6648|     North Emmashire|Australian Capita...|Australia|
|        903|   Leonhard Webland|       Male| 48|9694 Nathan Trail...|    2370|          Walkerfurt|     New South Wales|Australia|
|        941|   Egon Figliovanni|     Female| 59|617 Morrison Cres...|     1

In [18]:
# Count rows before dropping duplicates
original_count = df_customers.count()

# Drop duplicates based on all columns
df_no_duplicates = df_customers.dropDuplicates()

# Count rows after dropping duplicates
no_duplicates_count = df_no_duplicates.count()

# Check if there were duplicates
if original_count == no_duplicates_count:
    print("No duplicates found.")
else:
    print(f"Duplicates found and removed. Original count: {original_count}, Count after removing duplicates: {no_duplicates_count}")

# Show the cleaned DataFrame without duplicates
df_no_duplicates.show()


No duplicates found.
+-----------+-------------------+-----------+---+--------------------+--------+--------------------+--------------------+---------+
|customer_id|      customer_name|     gender|age|        home_address|zip_code|                city|               state|  country|
+-----------+-------------------+-----------+---+--------------------+--------+--------------------+--------------------+---------+
|        152|    Panchito Wybern|     Female| 79|03 Glover TrackAp...|    7845|       Christianport|Australian Capita...|Australia|
|        540|       Maude McCaig|Genderfluid| 75|297 Ferry LaneApt...|    8550|         East Amelia|     South Australia|Australia|
|        851|      Worthy Pardoe| Non-binary| 71|71 Joseph TrailAp...|    6648|     North Emmashire|Australian Capita...|Australia|
|        903|   Leonhard Webland|       Male| 48|9694 Nathan Trail...|    2370|          Walkerfurt|     New South Wales|Australia|
|        941|   Egon Figliovanni|     Female| 59|617 Mo

In [1]:
from pyspark.sql.functions import countDistinct
# Group by 'Country' column and aggregate with countDistinct on 'customer_id'
country_counts = df_customers.groupBy('state').agg(countDistinct('customer_id').alias('state_count'))
# Show the results
country_counts.show()


NameError: name 'df_customers' is not defined

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, avg

# Create a Spark session
spark = SparkSession.builder.appName("CustomerCountryStats").getOrCreate()

# Assuming 'df_customers' is your DataFrame

# Group by 'Country' column and aggregate with countDistinct on 'customer_id' and average on 'age'
country_stats = df_customers.groupBy('state').agg(
    countDistinct('customer_id').alias('customer_count'),
    avg('age').alias('avg_age')
)

# Show the results
country_stats.show()


+--------------------+--------------+------------------+
|               state|customer_count|           avg_age|
+--------------------+--------------+------------------+
|  Northern Territory|           125|            49.168|
|     South Australia|           139| 49.86330935251799|
|   Western Australia|           124| 48.70161290322581|
|            Victoria|           121| 47.83471074380165|
|     New South Wales|           132| 52.88636363636363|
|Australian Capita...|           121|50.710743801652896|
|            Tasmania|           104| 48.59615384615385|
|          Queensland|           134|50.634328358208954|
+--------------------+--------------+------------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col



# Calculate total amount spent by each customer
df_total_amount_by_customer = df_sales.withColumn("total_amount", col("price_per_unit") * col("quantity")) \
    .groupBy("customer_id") \
    .agg(sum("total_amount").alias("total_amount_spent"))

# Show the DataFrame with total amount spent by each customer
df_total_amount_by_customer.show()


In [67]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Group by city and count the number of customers in each city
city_customer_count = df_customers.groupBy("state").agg(F.countDistinct("customer_id").alias("customer_count"))
# Sort cities by customer count in descending order
sorted_cities = city_customer_count.orderBy(F.desc("customer_count"))
sorted_cities.show()
most_customers_city = sorted_cities.select("state", "customer_count").first()
# Show the city with the most customer count
print(f"State with the most customers: {most_customers_city['state']} ({most_customers_city['customer_count']} customers)")

+--------------------+--------------+
|               state|customer_count|
+--------------------+--------------+
|     South Australia|           139|
|          Queensland|           134|
|     New South Wales|           132|
|  Northern Territory|           125|
|   Western Australia|           124|
|            Victoria|           121|
|Australian Capita...|           121|
|            Tasmania|           104|
+--------------------+--------------+

State with the most customers: South Australia (139 customers)


In [21]:

from pyspark.sql.functions import countDistinct, sum
# Group by 'Country' column and aggregate with countDistinct on 'customer_id' and sum on 'age'
country_stats = df_customers.groupBy('state').agg(
    countDistinct('customer_id').alias('customer_count'),
    sum('age').alias('total_age')
)

# Show the results
country_stats.show()


+--------------------+--------------+---------+
|               state|customer_count|total_age|
+--------------------+--------------+---------+
|  Northern Territory|           125|     6146|
|     South Australia|           139|     6931|
|   Western Australia|           124|     6039|
|            Victoria|           121|     5788|
|     New South Wales|           132|     6981|
|Australian Capita...|           121|     6136|
|            Tasmania|           104|     5054|
|          Queensland|           134|     6785|
+--------------------+--------------+---------+



In [46]:
from pyspark.sql import functions as F
# Count the number of unique states
unique_states_df = df_customers.agg(F.countDistinct("state").alias("unique_states"))
unique_states_df.show()
# Count the number of unique cities
unique_cities_df = df_customers.agg(F.countDistinct("city").alias("unique_cities"))
unique_cities_df.show()



+-------------+
|unique_states|
+-------------+
|            8|
+-------------+

+-------------+
|unique_cities|
+-------------+
|          961|
+-------------+



In [47]:
# Get the distinct city names
distinct_cities_df = df_customers.select("city").distinct()

# Show the distinct city names
distinct_cities_df.show()


+----------------+
|            city|
+----------------+
|      Lake Lucas|
|     Sanfordberg|
|      Port Darcy|
|  Hackettchester|
| Lake Lillyville|
|     Lake Joshua|
|  South Zacmouth|
|   Dickinsonside|
| New Alexchester|
|      Chloeville|
|   Turnerborough|
|      Walterfurt|
|      Lake Jesse|
|    Samanthaberg|
|Lake Callumville|
|      Justinport|
|      Audreyport|
|      Evansburgh|
|        West Kai|
|Greenfeldershire|
+----------------+
only showing top 20 rows



In [50]:
from pyspark.sql import functions as F

# Calculate the average age of customers
avg_age_df = df_customers.agg(F.avg("age").alias("average_age"))
avg_age_df.show()


+-----------+
|average_age|
+-----------+
|      49.86|
+-----------+



In [49]:
# Get the distinct city names
distinct_cities_df = df_customers.select("state").distinct()

# Show the distinct city names
distinct_cities_df.show()

+--------------------+
|               state|
+--------------------+
|  Northern Territory|
|     South Australia|
|   Western Australia|
|            Victoria|
|     New South Wales|
|Australian Capita...|
|            Tasmania|
|          Queensland|
+--------------------+



In [79]:
# Perform the join operation using customer_id
df_merged2 = df_merged.join(df_customers, on='customer_id', how='left')

# Show the merged DataFrame
df_merged2.show(truncate=False)


+-----------+--------+-------+----------+-------------+----------------------+-------------------+-----------+---+------------------------------+--------+------------------+------------------+---------+
|customer_id|order_id|payment|order_date|delivery_date|total_amount_per_order|customer_name      |gender     |age|home_address                  |zip_code|city              |state             |country  |
+-----------+--------+-------+----------+-------------+----------------------+-------------------+-----------+---+------------------------------+--------+------------------+------------------+---------+
|64         |1       |30811  |2021-08-30|2021-09-24   |1487                  |Annabella Devote   |Genderfluid|75 |4927 Alice MeadowApt. 960     |7787    |Sanfordborough    |South Australia   |Australia|
|473        |2       |50490  |2021-02-03|2021-02-13   |1130                  |Lori Briars        |Male       |61 |531 Schmitt BoulevardApt. 010 |1744    |Annaton           |South Australia

In [81]:
# Assuming df_merged is your DataFrame containing merged data

# Select specific columns to show
df_selected = df_merged2.select("customer_id", "customer_name", "total_amount_per_order")

# Show the selected columns
df_selected.show(truncate=False)


+-----------+-------------------+----------------------+
|customer_id|customer_name      |total_amount_per_order|
+-----------+-------------------+----------------------+
|64         |Annabella Devote   |1487                  |
|473        |Lori Briars        |1130                  |
|774        |Ellynn Korba       |508                   |
|433        |Candis Roswarne    |976                   |
|441        |Artemas Vasilischev|2043                  |
|800        |Katusha Ceney      |732                   |
|626        |Tann Scothron      |523                   |
|58         |Aurie Margett      |299                   |
|852        |Lucas Cromly       |1315                  |
|659        |Ira Dafforne       |874                   |
|785        |Jillana Blankau    |461                   |
|120        |Didi Garton        |714                   |
|204        |Bird McGarvie      |1583                  |
|957        |Felic Marrian      |1254                  |
|468        |Maegan Ashton     

In [35]:
file_path = r"C:\Users\abdel\OneDrive\Desktop\Sub\BigData\Csv. files\orders.csv"
# Read the CSV file into a DataFrame
df_orders = spark.read.csv(file_path, header=True, inferSchema=True)
df_orders.show(10,0)
df_orders.printSchema()
print("Count of dataframe:",df_orders.count())

+--------+-----------+-------+-------------------+-------------+
|order_id|customer_id|payment|order_date         |delivery_date|
+--------+-----------+-------+-------------------+-------------+
|1       |64         |30811  |2021-08-30 00:00:00|2021-09-24   |
|2       |473        |50490  |2021-02-03 00:00:00|2021-02-13   |
|3       |774        |46763  |2021-10-08 00:00:00|2021-11-03   |
|4       |433        |39782  |2021-05-06 00:00:00|2021-05-19   |
|5       |441        |14719  |2021-03-23 00:00:00|2021-03-24   |
|6       |800        |16197  |2021-09-09 00:00:00|2021-10-05   |
|7       |626        |37666  |2021-04-05 00:00:00|2021-04-11   |
|8       |58         |28484  |2021-04-12 00:00:00|2021-05-01   |
|9       |852        |12896  |2021-05-01 00:00:00|2021-05-11   |
|10      |659        |21922  |2021-10-15 00:00:00|2021-10-16   |
+--------+-----------+-------+-------------------+-------------+
only showing top 10 rows

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: 

In [69]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
# Assuming df_orders is your DataFrame containing order data with 'customer_id' column
# Group by 'customer_id' and aggregate with count on 'order_id' to get order frequency
df_order_frequency = df_orders.groupBy('customer_id').agg(count('order_id').alias('order_frequency'))
# Show the results
df_order_frequency.show(30)


+-----------+---------------+
|customer_id|order_frequency|
+-----------+---------------+
|        471|              1|
|        833|              2|
|        463|              2|
|        148|              1|
|        496|              1|
|        623|              3|
|        737|              2|
|        516|              1|
|        251|              1|
|         85|              1|
|        808|              1|
|        458|              1|
|        883|              1|
|        588|              2|
|        799|              2|
|        898|              2|
|        970|              1|
|        133|              1|
|        853|              1|
|        472|              2|
|        513|              1|
|         78|              2|
|        918|              1|
|        321|              1|
|        673|              1|
|        857|              1|
|        974|              1|
|        362|              1|
|        876|              1|
|        375|              2|
+---------

In [70]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Assuming df_orders is your DataFrame containing order data with 'customer_id' column
# Assuming df_order_frequency is your DataFrame containing order frequency data with 'customer_id' and 'order_frequency' columns

# Perform the join based on 'customer_id'
df_orders_with_frequency = df_orders.join(df_order_frequency, 'customer_id', 'left')

# Show the joined DataFrame
df_orders_with_frequency.show(5)


+-----------+--------+-------+----------+-------------+---------------+
|customer_id|order_id|payment|order_date|delivery_date|order_frequency|
+-----------+--------+-------+----------+-------------+---------------+
|         64|       1|  30811|2021-08-30|   2021-09-24|              1|
|        473|       2|  50490|2021-02-03|   2021-02-13|              2|
|        774|       3|  46763|2021-10-08|   2021-11-03|              3|
|        433|       4|  39782|2021-05-06|   2021-05-19|              3|
|        441|       5|  14719|2021-03-23|   2021-03-24|              2|
+-----------+--------+-------+----------+-------------+---------------+
only showing top 5 rows



In [53]:
from pyspark.sql.functions import col, to_date

# Assuming 'order_date' is your timestamp column
df_orders= df_orders.withColumn('order_date', to_date(col('order_date')))

# Show the updated schema to verify the data type change
df_orders.printSchema()
df_orders. show()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- payment: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)

+--------+-----------+-------+----------+-------------+
|order_id|customer_id|payment|order_date|delivery_date|
+--------+-----------+-------+----------+-------------+
|       1|         64|  30811|2021-08-30|   2021-09-24|
|       2|        473|  50490|2021-02-03|   2021-02-13|
|       3|        774|  46763|2021-10-08|   2021-11-03|
|       4|        433|  39782|2021-05-06|   2021-05-19|
|       5|        441|  14719|2021-03-23|   2021-03-24|
|       6|        800|  16197|2021-09-09|   2021-10-05|
|       7|        626|  37666|2021-04-05|   2021-04-11|
|       8|         58|  28484|2021-04-12|   2021-05-01|
|       9|        852|  12896|2021-05-01|   2021-05-11|
|      10|        659|  21922|2021-10-15|   2021-10-16|
|      11|        785|  36624|2021-06-15|   2021-06-30|
| 

In [56]:
from pyspark.sql import functions as F

# Aggregations for order_df DataFrame
order_aggregations = df_orders.agg(
    F.countDistinct("order_id").alias("unique_orders_count"),
    F.countDistinct("customer_id").alias("unique_customers_count"),   
)
# Show the aggregated results
order_aggregations.show()


+-------------------+----------------------+----------------------+------------+-------------------------+
|unique_orders_count|unique_customers_count|unique_payment_methods|total_orders|orders_with_delivery_date|
+-------------------+----------------------+----------------------+------------+-------------------------+
|               1000|                   617|                   995|        1000|                     1000|
+-------------------+----------------------+----------------------+------------+-------------------------+



In [43]:
orders_per_day = df_orders.groupBy("order_date").count().orderBy("order_date")
orders_per_day.show() 

+----------+-----+
|order_date|count|
+----------+-----+
|2021-01-01|    3|
|2021-01-02|    7|
|2021-01-03|    3|
|2021-01-04|    3|
|2021-01-05|    5|
|2021-01-06|    2|
|2021-01-07|    5|
|2021-01-08|    2|
|2021-01-09|    5|
|2021-01-10|    6|
|2021-01-11|    4|
|2021-01-12|    7|
|2021-01-13|    4|
|2021-01-14|    3|
|2021-01-15|    3|
|2021-01-16|    1|
|2021-01-17|    2|
|2021-01-18|    2|
|2021-01-19|    1|
|2021-01-20|    4|
+----------+-----+
only showing top 20 rows



In [24]:
df_orders.select('customer_id').distinct().count()

617

In [25]:
file_path = r"C:\Users\abdel\OneDrive\Desktop\Sub\BigData\Csv. files\sales.csv"
df_sales = spark.read.csv(file_path, header=True, inferSchema=True)
df_sales.show(10,0)
df_sales.printSchema()
print("Count of dataframe:",df_sales.count())

+--------+--------+----------+--------------+--------+-----------+
|sales_id|order_id|product_id|price_per_unit|quantity|total_price|
+--------+--------+----------+--------------+--------+-----------+
|0       |1       |218       |106           |2       |212        |
|1       |1       |481       |118           |1       |118        |
|2       |1       |2         |96            |3       |288        |
|3       |1       |1002      |106           |2       |212        |
|4       |1       |691       |113           |3       |339        |
|5       |1       |981       |106           |3       |318        |
|6       |2       |915       |96            |1       |96         |
|7       |2       |686       |113           |1       |113        |
|8       |2       |1091      |115           |3       |345        |
|9       |2       |1196      |105           |1       |105        |
+--------+--------+----------+--------------+--------+-----------+
only showing top 10 rows

root
 |-- sales_id: integer (nullabl

In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Create a Spark session
spark = SparkSession.builder \
    .appName("TotalAmountPerOrder") \
    .getOrCreate()

# Assuming df_sales is your DataFrame with sales data
# Calculate the total amount spent per order ID
df_total_amount = df_sales.withColumn("total_amount", col("price_per_unit") * col("quantity")) \
    .groupBy("order_id") \
    .agg(sum("total_amount").alias("total_amount_per_order")) \
    .orderBy("order_id")

# Show the total amount spent per order ID
df_total_amount.show(truncate=False)


+--------+----------------------+
|order_id|total_amount_per_order|
+--------+----------------------+
|1       |1487                  |
|2       |1130                  |
|3       |508                   |
|4       |976                   |
|5       |2043                  |
|6       |732                   |
|7       |523                   |
|8       |299                   |
|9       |1315                  |
|10      |874                   |
|11      |461                   |
|12      |714                   |
|13      |1583                  |
|14      |1254                  |
|15      |1365                  |
|16      |1786                  |
|17      |670                   |
|18      |952                   |
|19      |1244                  |
|20      |1557                  |
+--------+----------------------+
only showing top 20 rows



In [72]:
# Assuming df_total_amount is your DataFrame with total amount per order ID
# Assuming df_orders is your DataFrame with order details

# Perform the join operation
df_merged = df_orders.join(df_total_amount, on='order_id', how='left')

# Show the merged DataFrame
df_merged.show(truncate=False)


+--------+-----------+-------+----------+-------------+----------------------+
|order_id|customer_id|payment|order_date|delivery_date|total_amount_per_order|
+--------+-----------+-------+----------+-------------+----------------------+
|1       |64         |30811  |2021-08-30|2021-09-24   |1487                  |
|2       |473        |50490  |2021-02-03|2021-02-13   |1130                  |
|3       |774        |46763  |2021-10-08|2021-11-03   |508                   |
|4       |433        |39782  |2021-05-06|2021-05-19   |976                   |
|5       |441        |14719  |2021-03-23|2021-03-24   |2043                  |
|6       |800        |16197  |2021-09-09|2021-10-05   |732                   |
|7       |626        |37666  |2021-04-05|2021-04-11   |523                   |
|8       |58         |28484  |2021-04-12|2021-05-01   |299                   |
|9       |852        |12896  |2021-05-01|2021-05-11   |1315                  |
|10      |659        |21922  |2021-10-15|2021-10-16 

In [26]:
df_sales .select('order_id').distinct().count()

993

In [32]:
df_sales.describe(["price_per_unit", "quantity", "total_price"]).show()


+-------+-----------------+------------------+-----------------+
|summary|   price_per_unit|          quantity|      total_price|
+-------+-----------------+------------------+-----------------+
|  count|             5000|              5000|             5000|
|   mean|         103.5016|            1.9924|           206.36|
| stddev|9.195004462283432|0.8075101575403906|86.35745666741475|
|    min|               90|                 1|               90|
|    max|              119|                 3|              357|
+-------+-----------------+------------------+-----------------+



In [27]:
sales_by_product = df_sales.groupBy('product_id').sum('total_price').orderBy('sum(total_price)', ascending=False)
sales_by_product.show()

+----------+----------------+
|product_id|sum(total_price)|
+----------+----------------+
|        78|            2832|
|       472|            2714|
|       707|            2499|
|       579|            2400|
|       843|            2373|
|       486|            2360|
|       740|            2289|
|       727|            2261|
|       182|            2254|
|       465|            2124|
|        95|            2124|
|        74|            2124|
|       810|            2106|
|       316|            2071|
|       405|            2023|
|       222|            2014|
|       830|            1989|
|      1188|            1980|
|      1184|            1980|
|      1091|            1955|
+----------+----------------+
only showing top 20 rows



In [51]:
from pyspark.sql import functions as F

# Aggregations for df_sales DataFrame
sales_aggregations = df_sales.agg(
    F.countDistinct("sales_id").alias("unique_sales_count"),
    F.countDistinct("order_id").alias("unique_orders_count"),
    F.countDistinct("product_id").alias("unique_products_count"),
    F.avg("price_per_unit").alias("avg_price_per_unit"),
    (F.sum("price_per_unit") * F.sum("quantity")).alias("total_revenue"),
    F.sum("quantity").alias("total_quantity_sold")
)

# Show the aggregated results
sales_aggregations.show()



+------------------+-------------------+---------------------+------------------+-------------+-------------------+
|unique_sales_count|unique_orders_count|unique_products_count|avg_price_per_unit|total_revenue|total_quantity_sold|
+------------------+-------------------+---------------------+------------------+-------------+-------------------+
|              5000|                993|                 1233|          103.5016|   5155414696|               9962|
+------------------+-------------------+---------------------+------------------+-------------+-------------------+



In [28]:
file_path = r"C:\Users\abdel\OneDrive\Desktop\Sub\BigData\Csv. files\products.csv"
df_products = spark.read.csv(file_path, header=True, inferSchema=True)
# Show the first 10 rows of the DataFrame
df_products.show(10,0)
df_products.printSchema()
print("Count of dataframe:",df_products.count())

+----------+------------+------------+----+------+-----+--------+-----------------------------------------------+
|product_ID|product_type|product_name|size|colour|price|quantity|description                                    |
+----------+------------+------------+----+------+-----+--------+-----------------------------------------------+
|0         |Shirt       |Oxford Cloth|XS  |red   |114  |66      |A red coloured, XS sized, Oxford Cloth Shirt   |
|1         |Shirt       |Oxford Cloth|S   |red   |114  |53      |A red coloured, S sized, Oxford Cloth Shirt    |
|2         |Shirt       |Oxford Cloth|M   |red   |114  |54      |A red coloured, M sized, Oxford Cloth Shirt    |
|3         |Shirt       |Oxford Cloth|L   |red   |114  |69      |A red coloured, L sized, Oxford Cloth Shirt    |
|4         |Shirt       |Oxford Cloth|XL  |red   |114  |47      |A red coloured, XL sized, Oxford Cloth Shirt   |
|5         |Shirt       |Oxford Cloth|XS  |orange|114  |45      |A orange coloured, XS s

In [75]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Calculate total price for each product
df_total_price = df_products.withColumn("total_price", col("quantity") * col("price"))

# Show the DataFrame with total price
df_total_price.show(truncate=False)


+----------+------------+------------+----+------+-----+--------+-----------------------------------------------+-----------+
|product_ID|product_type|product_name|size|colour|price|quantity|description                                    |total_price|
+----------+------------+------------+----+------+-----+--------+-----------------------------------------------+-----------+
|0         |Shirt       |Oxford Cloth|XS  |red   |114  |66      |A red coloured, XS sized, Oxford Cloth Shirt   |7524       |
|1         |Shirt       |Oxford Cloth|S   |red   |114  |53      |A red coloured, S sized, Oxford Cloth Shirt    |6042       |
|2         |Shirt       |Oxford Cloth|M   |red   |114  |54      |A red coloured, M sized, Oxford Cloth Shirt    |6156       |
|3         |Shirt       |Oxford Cloth|L   |red   |114  |69      |A red coloured, L sized, Oxford Cloth Shirt    |7866       |
|4         |Shirt       |Oxford Cloth|XL  |red   |114  |47      |A red coloured, XL sized, Oxford Cloth Shirt   |5358 

In [78]:
df_total_price_by_type_color = df_products.withColumn("total_price", col("quantity") * col("price")) \
    .groupBy("product_type", "colour") \
    .agg(sum("total_price").alias("total_price"))

# Show the DataFrame with total price by product type and color
df_total_price_by_type_color.show(truncate=False)

+------------+------+-----------+
|product_type|colour|total_price|
+------------+------+-----------+
|Shirt       |red   |389840     |
|Trousers    |orange|375139     |
|Jacket      |orange|403578     |
|Trousers    |violet|368337     |
|Jacket      |green |389454     |
|Trousers    |blue  |353392     |
|Jacket      |red   |386938     |
|Jacket      |violet|403096     |
|Trousers    |yellow|367561     |
|Shirt       |yellow|399579     |
|Jacket      |yellow|377513     |
|Shirt       |blue  |393531     |
|Shirt       |indigo|392906     |
|Jacket      |blue  |387448     |
|Jacket      |indigo|381438     |
|Shirt       |orange|382718     |
|Trousers    |indigo|348804     |
|Trousers    |red   |381457     |
|Trousers    |green |364529     |
|Shirt       |violet|393587     |
+------------+------+-----------+
only showing top 20 rows



In [29]:
df_customers.describe().show()
df_sales.describe().show()


+-------+-----------------+--------------+----------+------------------+--------------------+-----------------+---------+--------------------+---------+
|summary|      customer_id| customer_name|    gender|               age|        home_address|         zip_code|     city|               state|  country|
+-------+-----------------+--------------+----------+------------------+--------------------+-----------------+---------+--------------------+---------+
|  count|             1000|          1000|      1000|              1000|                1000|             1000|     1000|                1000|     1000|
|   mean|            500.5|          null|      null|             49.86|                null|         5004.872|     null|                null|     null|
| stddev|288.8194360957494|          null|      null|17.647828360618387|                null|2884.497332027621|     null|                null|     null|
|    min|                1|Abbot Rickaert|   Agender|                20|00 Fadel C

In [58]:
from pyspark.sql import functions as F

# Aggregations for product DataFrame
product_aggregations = df_products.agg(
    F.countDistinct("product_ID").alias("unique_products_count"),
    F.count("product_type").alias("total_products"),
    F.avg("price").alias("average_price"),
    F.sum("quantity").alias("total_quantity")
)

# Show the aggregated results
product_aggregations.show()


+---------------------+--------------+------------------+--------------+
|unique_products_count|total_products|     average_price|total_quantity|
+---------------------+--------------+------------------+--------------+
|                 1260|          1260|105.80555555555556|         75789|
+---------------------+--------------+------------------+--------------+



In [60]:

# Count of Products by Type
product_type_counts = df_products.groupBy("product_type").count().orderBy("count", ascending=False)
# Show product type counts
product_type_counts.show()
# Statistical Analysis by Product Type (example: average price)
avg_price_by_type = df_products.groupBy("product_type").agg(F.avg("price").alias("avg_price"))
# Show average price by product type
avg_price_by_type.show()

+------------+-----+
|product_type|count|
+------------+-----+
|      Jacket|  420|
|    Trousers|  420|
|       Shirt|  420|
+------------+-----+

+------------+------------------+
|product_type|         avg_price|
+------------+------------------+
|      Jacket|107.41666666666667|
|    Trousers|101.66666666666667|
|       Shirt|108.33333333333333|
+------------+------------------+



In [30]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Join Example") \
    .getOrCreate()

# Assuming df_orders and df_customers are your Spark DataFrames

# Perform inner join on 'customer_id' column
merged_df = df_orders.join(df_customers, on='customer_id', how='inner')

# Show the resulting DataFrame
merged_df.show(5)


+-----------+--------+-------+----------+-------------+-----------------+--------+---+--------------------+--------+-------------+------------------+---------+
|customer_id|order_id|payment|order_date|delivery_date|    customer_name|  gender|age|        home_address|zip_code|         city|             state|  country|
+-----------+--------+-------+----------+-------------+-----------------+--------+---+--------------------+--------+-------------+------------------+---------+
|          1|     729|  35593|2021-02-18|   2021-03-01|    Leanna Busson|  Female| 30|8606 Victoria Ter...|    5464|Johnstonhaven|Northern Territory|Australia|
|          1|     670|  10246|2021-03-06|   2021-04-01|    Leanna Busson|  Female| 30|8606 Victoria Ter...|    5464|Johnstonhaven|Northern Territory|Australia|
|          1|     455|  24550|2021-04-04|   2021-04-06|    Leanna Busson|  Female| 30|8606 Victoria Ter...|    5464|Johnstonhaven|Northern Territory|Australia|
|          7|     465|  48935|2021-05-21