In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySparkGoogleColab").getOrCreate() #spark session baslatma

spark.version #versiyon kontrolu


'3.5.5'

In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("PySparkGoogleColab").getOrCreate()# SparkSession create

# Google Drive mount
from google.colab import drive
drive.mount('/content/drive')


file_path = "/content/drive/My Drive/spark_data/"  # file path


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# read parquet files
customer_df = spark.read.parquet(file_path + "customer.parquet")
date_df = spark.read.parquet(file_path + "date.parquet")
product_df = spark.read.parquet(file_path + "product.parquet")
purchase_df = spark.read.parquet(file_path + "purchase.parquet")
region_df = spark.read.parquet(file_path + "region.parquet")
retailer_df = spark.read.parquet(file_path + "retailer.parquet")
sale_df = spark.read.parquet(file_path + "sale.parquet")
supplier_df = spark.read.parquet(file_path + "supplier.parquet")

# kontrol
product_df.show(3)
region_df.show(3)
sale_df.show(3)
retailer_df.show(3)

+----------+------------+------------+------+------+--------+----------+
|product_id|product_code|product_type|colour|  size|material|unit_price|
+----------+------------+------------+------+------+--------+----------+
|         1| BTS001BLC-S|      Tshirt| Black| Small|  Cotton|        25|
|         2| BTS001BLC-M|      Tshirt| Black|Medium|  Cotton|        25|
|         3| BTS001BLC-L|      Tshirt| Black| Large|  Cotton|        25|
+----------+------------+------------+------+------+--------+----------+
only showing top 3 rows

+-------+--------------+-----------------+
|city_id|     city_name|      region_name|
+-------+--------------+-----------------+
|      1|         Adana|       Ic Anadolu|
|      2|      Adiyaman|Guneydogu Anadolu|
|      3|Afyonkarahisar|       Ic Anadolu|
+-------+--------------+-----------------+
only showing top 3 rows

+--------+----------+-----------+-----------+--------+---------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_am

In [None]:
product_df.filter(product_df.product_type == "Jacket").show()
sale_df.show(0)
retailer_df.show(0)
region_df.show(0)

+----------+-------------+------------+------+------+--------+----------+
|product_id| product_code|product_type|colour|  size|material|unit_price|
+----------+-------------+------------+------+------+--------+----------+
|       169| BJKT001BLC-S|      Jacket| Black| Small|  Cotton|        50|
|       170| BJKT001BLC-M|      Jacket| Black|Medium|  Cotton|        50|
|       171| BJKT001BLC-L|      Jacket| Black| Large|  Cotton|        50|
|       172|BJKT001BLC-XL|      Jacket| Black|XLarge|  Cotton|        50|
|       173|  BJKT001WC-S|      Jacket| White| Small|  Cotton|        50|
|       174|  BJKT001WC-M|      Jacket| White|Medium|  Cotton|        50|
|       175|  BJKT001WC-L|      Jacket| White| Large|  Cotton|        50|
|       176| BJKT001WC-XL|      Jacket| White|XLarge|  Cotton|        50|
|       177|  BJKT001BC-S|      Jacket|  Blue| Small|  Cotton|        50|
|       178|  BJKT001BC-M|      Jacket|  Blue|Medium|  Cotton|        50|
|       179|  BJKT001BC-L|      Jacket

In [None]:
jacket_df = product_df.filter(product_df.product_type == "Jacket")
jacket_sales_df = sale_df.join(jacket_df, on="product_id", how="inner")
jacket_sales_retailer_df = jacket_sales_df.join(retailer_df, on="retailer_id", how="inner")
jacket_sales_region_df = jacket_sales_retailer_df.join(region_df, on="city_id", how="inner")
#jacket_sales_region_df.show(jacket_sales_region_df.count(), truncate=False)

from pyspark.sql.functions import sum

jacket_sales_summary = jacket_sales_region_df.groupBy("region_name").agg(sum("quantity").alias("total_sales"))
jacket_sales_summary.show()



+-----------+-----------+
|region_name|total_sales|
+-----------+-----------+
|        Ege|        632|
|    Marmara|       5261|
+-----------+-----------+



In [None]:
#tempViews
product_df.createOrReplaceTempView("product")
sale_df.createOrReplaceTempView("sale")
retailer_df.createOrReplaceTempView("retailer")
region_df.createOrReplaceTempView("region")

# spark sql
query = """
SELECT r.region_name, SUM(s.quantity) AS total_sales
FROM sale s
JOIN product p ON s.product_id = p.product_id
JOIN retailer t ON s.retailer_id = t.retailer_id
JOIN region r ON t.city_id = r.city_id
WHERE p.product_type = 'Jacket'
GROUP BY r.region_name
"""

jacket_sales_summary_sql = spark.sql(query)
jacket_sales_summary_sql.show()


+-----------+-----------+
|region_name|total_sales|
+-----------+-----------+
|        Ege|        632|
|    Marmara|       5261|
+-----------+-----------+



In [None]:
sale_df.show(1)
retailer_df.show(1)
region_df.show(1)






+--------+----------+-----------+-----------+--------+---------+----------+
|order_id|product_id|customer_id|retailer_id|quantity|total_amt|      date|
+--------+----------+-----------+-----------+--------+---------+----------+
|       1|       241|        551|          2|       1|       32|2023-08-30|
+--------+----------+-----------+-----------+--------+---------+----------+
only showing top 1 row

+-----------+-------+-------------+-------------+
|retailer_id|city_id|retailer_type|retailer_name|
+-----------+-------+-------------+-------------+
|        1.0|   34.0|     Internet|            A|
+-----------+-------+-------------+-------------+
only showing top 1 row

+-------+---------+-----------+
|city_id|city_name|region_name|
+-------+---------+-----------+
|      1|    Adana| Ic Anadolu|
+-------+---------+-----------+
only showing top 1 row



In [None]:
from pyspark.sql import functions as F

# filtreleyip birleştirme
sale_filtered_df = sale_df.select("retailer_id", "total_amt")
sales_retailer_df = sale_filtered_df.join(retailer_df.select("retailer_id", "city_id"), on="retailer_id", how="inner")
sales_retailer_region_df = sales_retailer_df.join(region_df.select("city_id", "region_name"), on="city_id", how="inner")

# her bölgenin cirosunu bulöa
region_turnover_df = sales_retailer_region_df.groupBy("region_name").agg(F.sum("total_amt").alias("total_turnover"))

# max ciroya sahip bölgeyi bulma
max_turnover_region_df = region_turnover_df.orderBy(F.desc("total_turnover")).limit(1)

max_turnover_region_df.show()




+-----------+--------------+
|region_name|total_turnover|
+-----------+--------------+
|    Marmara|        635153|
+-----------+--------------+



In [None]:
#tempViews
sale_df.createOrReplaceTempView("sale")
retailer_df.createOrReplaceTempView("retailer")
region_df.createOrReplaceTempView("region")

# spark sql
query = """
    SELECT r.region_name, SUM(s.total_amt) AS total_turnover
    FROM sale s
    JOIN retailer rtr ON s.retailer_id = rtr.retailer_id
    JOIN region r ON rtr.city_id = r.city_id
    GROUP BY r.region_name
    ORDER BY total_turnover DESC
    LIMIT 1
"""


max_turnover_region_df = spark.sql(query)

max_turnover_region_df.show()


+-----------+--------------+
|region_name|total_turnover|
+-----------+--------------+
|    Marmara|        635153|
+-----------+--------------+



In [30]:
!ls /content/drive/MyDrive/Colab\ Notebooks




ElifOskanbas.ipynb    Untitled0.ipynb  Untitled2.ipynb
Spark_Homework.ipynb  Untitled1.ipynb  Untitled3.ipynb
