In [161]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [177]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime
import pandas as pd

In [165]:
spark = SparkSession.builder.appName("FinalDZ").getOrCreate()

In [179]:
df = spark.read.option("header", "true") \
                   .option("inferSchema", "true") \
                   .csv("./SampleSuperstore.csv")

In [181]:
spark.version

'3.5.4'

In [183]:
df.show(5)

+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+--------+--------+--------+--------+
|     Ship Mode|  Segment|      Country|           City|     State|Postal Code|Region|       Category|Sub-Category|   Sales|Quantity|Discount|  Profit|
+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+--------+--------+--------+--------+
|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|   Bookcases|  261.96|       2|     0.0| 41.9136|
|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|      Chairs|  731.94|       3|     0.0| 219.582|
|  Second Class|Corporate|United States|    Los Angeles|California|      90036|  West|Office Supplies|      Labels|   14.62|       2|     0.0|  6.8714|
|Standard Class| Consumer|United States|Fort Lauderdale|   Florida|      33311| South|  

In [185]:
#Хэщ функция и определение текущей даты
def hashing(*columns):
    concat_col = concat_ws("|", *columns)
    return sha2(concat_col, 256)

as_of_day = datetime.now().strftime("%Y-%m-%d")
print(as_of_day)

2025-10-16


In [193]:
#Хабы
customers_hub = df.select(
    "Segment", "City", "Postal Code",
    concat_ws("_", "Segment", "City", "Postal Code").alias("customer_id")
).distinct() \
 .withColumn("hk_customer_id", hashing("Segment", "City", "Postal Code")) \
 .withColumn("as_of_day", lit(as_of_day)) \
 .select("customer_id", "hk_customer_id", "as_of_day")

products_hub = df.select(
    "Category", "Sub-Category", 
    concat_ws("_", "Category", "Sub-Category").alias("product_id")
).distinct() \
 .withColumn("hk_product_id", hashing("Category", "Sub-Category")) \
 .withColumn("as_of_day", lit(as_of_day)) \
 .select("product_id", "hk_product_id", "as_of_day")

locations_hub = df.select(
    "City", "State", "Postal Code",
    concat_ws("_", "City", "State", "Postal Code").alias("location_id")
).distinct() \
 .withColumn("hk_location_id", hashing("City", "State", "Postal Code")) \
 .withColumn("as_of_day", lit(as_of_day)) \
 .select("location_id", "hk_location_id", "as_of_day")

sales_hub = df.select(
    "Ship Mode", "Sales", "Quantity",
    concat_ws("_", "Ship Mode", "Sales", "Quantity").alias("sale_id")
).distinct() \
 .withColumn("hk_sale_id", hashing("Ship Mode", "Sales", "Quantity")) \
 .withColumn("as_of_day", lit(as_of_day)) \
 .select("sale_id", "hk_sale_id", "as_of_day")

In [195]:
#Линки
sales_link = df.select(
    "Segment", "City", "Postal Code", 
    "Category", "Sub-Category",         
    "City", "State", "Postal Code",   
    "Ship Mode", "Sales", "Quantity"  
).distinct()

sales_link = sales_link \
    .withColumn("customer_id", concat_ws("_", "Segment", "City", "Postal Code")) \
    .withColumn("product_id", concat_ws("_", "Category", "Sub-Category")) \
    .withColumn("location_id", concat_ws("_", "City", "State", "Postal Code")) \
    .withColumn("sale_id", concat_ws("_", "Ship Mode", "Sales", "Quantity")) \
    .withColumn("hk_customer_id", hashing("Segment", "City", "Postal Code")) \
    .withColumn("hk_product_id", hashing("Category", "Sub-Category")) \
    .withColumn("hk_location_id", hashing("City", "State", "Postal Code")) \
    .withColumn("hk_sale_id", hashing("Ship Mode", "Sales", "Quantity")) \
    .withColumn("link_id", hashing(
        col("hk_customer_id"),
        col("hk_product_id"), 
        col("hk_location_id"),
        col("hk_sale_id")
    )) \
    .withColumn("as_of_day", lit(as_of_day)) \
    .select("link_id", "hk_customer_id", "hk_product_id", "hk_location_id", "hk_sale_id", "as_of_day")

In [197]:
#Саттелиты
customer_sat = df.select(
    "Segment", "City", "State", "Postal Code", "Region", "Country"
).distinct() \
 .withColumn("customer_id", concat_ws("_", "Segment", "City", "Postal Code")) \
 .withColumn("hk_customer_id", hashing("Segment", "City", "Postal Code")) \
 .withColumn("as_of_day", lit(as_of_day)) \
 .withColumn("hash_diff", hashing("Segment", "City", "State", "Postal Code", "Region", "Country")) \
 .withColumnRenamed("Segment", "segment") \
 .withColumnRenamed("City", "city") \
 .withColumnRenamed("State", "state") \
 .withColumnRenamed("Postal Code", "postal_code") \
 .withColumnRenamed("Region", "region") \
 .withColumnRenamed("Country", "country") \
 .select("hk_customer_id", "as_of_day", "hash_diff", "segment", "city", "state", "postal_code", "region", "country")

product_sat = df.select(
    "Category", "Sub-Category"
).distinct() \
 .withColumn("product_id", concat_ws("_", "Category", "Sub-Category")) \
 .withColumn("hk_product_id", hashing("Category", "Sub-Category")) \
 .withColumn("as_of_day", lit(as_of_day)) \
 .withColumn("hash_diff", hashing("Category", "Sub-Category")) \
 .withColumnRenamed("Category", "category") \
 .withColumnRenamed("Sub-Category", "sub_category") \
 .select("hk_product_id", "as_of_day", "hash_diff", "category", "sub_category")

location_sat = df.select(
    "City", "State", "Postal Code", "Region", "Country"
).distinct() \
 .withColumn("location_id", concat_ws("_", "City", "State", "Postal Code")) \
 .withColumn("hk_location_id", hashing("City", "State", "Postal Code")) \
 .withColumn("as_of_day", lit(as_of_day)) \
 .withColumn("hash_diff", hashing("City", "State", "Postal Code", "Region", "Country")) \
 .withColumnRenamed("City", "city") \
 .withColumnRenamed("State", "state") \
 .withColumnRenamed("Postal Code", "postal_code") \
 .withColumnRenamed("Region", "region") \
 .withColumnRenamed("Country", "country") \
 .select("hk_location_id", "as_of_day", "hash_diff", "city", "state", "postal_code", "region", "country")

sales_sat = df.select(
    "Ship Mode", "Sales", "Quantity", "Discount", "Profit"
).distinct() \
 .withColumn("sale_id", concat_ws("_", "Ship Mode", "Sales", "Quantity")) \
 .withColumn("hk_sale_id", hashing("Ship Mode", "Sales", "Quantity")) \
 .withColumn("as_of_day", lit(as_of_day)) \
 .withColumn("hash_diff", hashing("Ship Mode", "Sales", "Quantity", "Discount", "Profit")) \
 .withColumnRenamed("Ship Mode", "ship_mode") \
 .withColumnRenamed("Sales", "sales") \
 .withColumnRenamed("Quantity", "quantity") \
 .withColumnRenamed("Discount", "discount") \
 .withColumnRenamed("Profit", "profit") \
 .select("hk_sale_id", "as_of_day", "hash_diff", "ship_mode", "sales", "quantity", "discount", "profit")

In [199]:
#Сейвим все в отдельные csv

# Хабы
customers_hub_pd = customers_hub.toPandas().to_csv(f"./hub_customers.csv", index=False)

products_hub_pd = products_hub.toPandas().to_csv(f"./hub_products.csv", index=False)

locations_hub_pd = locations_hub.toPandas().to_csv(f"./hub_locations.csv", index=False)

sales_hub_pd = sales_hub.toPandas().to_csv(f"./hub_sales.csv", index=False)

# Ссылки
sales_link_pd = sales_link.toPandas().to_csv(f"./link_sales.csv", index=False)

# Саттелиты
customer_sat_pd = customer_sat.toPandas().to_csv(f"./sat_customers.csv", index=False)

product_sat_pd = product_sat.toPandas().to_csv(f"./sat_products.csv", index=False)

location_sat_pd = location_sat.toPandas().to_csv(f"./sat_locations.csv", index=False)

sales_sat_pd = sales_sat.toPandas().to_csv(f"./sat_sales.csv", index=False)

In [201]:
spark.stop()