# spark_data_cleaning.ipynb

This notebook demonstrates **cleaning a dirty CSV dataset** using Apache Spark.

- The code has not been modified.
- A short explanation that matches the **actual purpose** of each code cell is added above it.

# Apache Spark ile Veri Temizleme

### Explanation
This cell makes the Spark installation visible to the Python environment and imports the required libraries. `findspark.init` ile Spark dizini (/opt/manual/spark) Python path'ine eklenir.

In [1]:
import findspark
findspark.init("/opt/manual/spark")
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

spark = (SparkSession.builder
         .appName("Apache Spark ile Veri Temizleme")
         .master("yarn")
         .enableHiveSupport()
         .getOrCreate()
        )

### Explanation
This cell reads the CSV dataset as a Spark DataFrame (typically using options such as header and delimiter).

In [26]:
df = spark.read.format("csv").option("header", True) \
.option("inferSchema", True) \
.option("sep", ",") \
.load("/user/train/datasets/dirty_store_transactions.csv") \
.withColumn(
    "Date", F.to_date(
        F.col("Date"), "yyyy-MM-dd")) \
.withColumn(
    "STORE_LOCATION",
    F.regexp_extract("STORE_LOCATION", r"^[A-Za-z\s]+", 0)) \
.withColumn("STORE_LOCATION", F.trim("STORE_LOCATION"))

### Explanation
This cell cleans or transforms string fields using regular expressions (e.g., removing `$` signs or special characters).

In [29]:
price_cols = ["MRP", "CP", "DISCOUNT", "SP"]

for col in price_cols:
    df = df.withColumn(
        col,
        F.regexp_replace(col, "[$]", "").cast("double")
    )

### Explanation
This cell performs the following step in the notebook workflow: şu işle ilgili bir adımı çalıştırır: `df.limit(3).toPandas()`

In [30]:
df.limit(3).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,New York,Electronics,12254943,31.0,20.77,1.86,29.14,2019-11-26
1,YR7220,New York,Furniture,72619323C,15.0,9.75,1.5,13.5,2019-11-26
2,YR7220,New York,Electronics,34161682B,88.0,62.48,4.4,83.6,2019-11-26


### Explanation
This cell displays the DataFrame schema (column names and data types).

In [31]:
df.printSchema()

root
 |-- STORE_ID: string (nullable = true)
 |-- STORE_LOCATION: string (nullable = true)
 |-- PRODUCT_CATEGORY: string (nullable = true)
 |-- PRODUCT_ID: string (nullable = true)
 |-- MRP: double (nullable = true)
 |-- CP: double (nullable = true)
 |-- DISCOUNT: double (nullable = true)
 |-- SP: double (nullable = true)
 |-- Date: date (nullable = true)



### Explanation
This cell checks the dataset size by counting the number of rows in the DataFrame.

In [34]:
# bucketing için sütun keşfi
df.select("PRODUCT_CATEGORY").distinct().count()

7

### Explanation
This cell shows sample rows from the DataFrame to inspect how the data looks.

In [37]:
df.select("PRODUCT_CATEGORY","MRP", "CP", "DISCOUNT", "SP") \
.groupBy("PRODUCT_CATEGORY") \
.agg(F.sum("MRP").alias("MRP_sum"),
     F.sum("CP").alias("CP_sum"),
     F.sum("DISCOUNT").alias("DISCOUNT_sum"),
     F.sum("SP").alias("SP_sum")
    ) \
.show()

+----------------+--------+------------------+------------------+------------------+
|PRODUCT_CATEGORY| MRP_sum|            CP_sum|      DISCOUNT_sum|            SP_sum|
+----------------+--------+------------------+------------------+------------------+
|       Education|311342.0|211041.39000000074| 18590.02999999996| 292751.9699999997|
|         Kitchen|292300.0|196763.36999999953|17597.960000000025| 274702.0400000001|
|         Fashion|308458.0| 208811.7299999984|18275.679999999957| 290182.3200000008|
|       Groceries|307999.0|208347.13999999972|18470.410000000036|289528.59000000154|
|     Electronics|286910.0|193100.90999999997|18196.699999999975| 268713.2999999993|
|       Furniture|287240.0|194438.57999999888|17134.279999999897| 270105.7199999993|
|       Cosmetics|298745.0| 201602.4099999989| 18137.24999999998|280607.74999999965|
+----------------+--------+------------------+------------------+------------------+



### Explanation
This cell checks the dataset size by counting the number of rows in the DataFrame.

In [38]:
# bucketing için sütun keşfi
df.select("STORE_LOCATION").distinct().count()

5

### Explanation
This cell shows sample rows from the DataFrame to inspect how the data looks.

In [39]:
df.select("STORE_LOCATION","MRP", "CP", "DISCOUNT", "SP") \
.groupBy("STORE_LOCATION") \
.agg(F.sum("MRP").alias("MRP_sum"),
     F.sum("CP").alias("CP_sum"),
     F.sum("DISCOUNT").alias("DISCOUNT_sum"),
     F.sum("SP").alias("SP_sum")
    ) \
.show()

+--------------+--------+------------------+------------------+------------------+
|STORE_LOCATION| MRP_sum|            CP_sum|      DISCOUNT_sum|            SP_sum|
+--------------+--------+------------------+------------------+------------------+
|    Washington|480174.0|324520.73999999854| 29375.88000000001|450798.12000000133|
|       Houston|413217.0| 278819.5199999993| 24945.19999999983|388271.79999999923|
|         Miami|395592.0|267729.23999999836|24296.640000000105|371295.36000000115|
|      New York|441707.0| 298466.4899999972| 26283.10999999997| 415423.8899999993|
|        Denver|362304.0|244569.53999999826|21501.479999999952| 340802.5199999994|
+--------------+--------+------------------+------------------+------------------+



### Explanation
This cell writes the processed data to the specified target in ORC format.

In [46]:
# ORC formatında hive e yazma. bucketing STORE_LOCATION
import time
start_time = time.time()

df.write.format("orc") \
.mode("overwrite") \
.saveAsTable("test1.dirty_store_transactions_tbl")

print("--- %s seconds ---" %(time.time() - start_time))

--- 12.070496082305908 seconds ---


### Explanation
This cell shows sample rows from the DataFrame to inspect how the data looks.

In [48]:
spark.sql("select * from test1.dirty_store_transactions_tbl limit 3").show()

+--------+--------------+----------------+----------+----+-----+--------+-----+----------+
|STORE_ID|STORE_LOCATION|PRODUCT_CATEGORY|PRODUCT_ID| MRP|   CP|DISCOUNT|   SP|      Date|
+--------+--------------+----------------+----------+----+-----+--------+-----+----------+
|  YR7220|      New York|     Electronics|  12254943|31.0|20.77|    1.86|29.14|2019-11-26|
|  YR7220|      New York|       Furniture| 72619323C|15.0| 9.75|     1.5| 13.5|2019-11-26|
|  YR7220|      New York|     Electronics| 34161682B|88.0|62.48|     4.4| 83.6|2019-11-26|
+--------+--------------+----------------+----------+----+-----+--------+-----+----------+



### Explanation
This cell shows sample rows from the DataFrame to inspect how the data looks.

In [49]:
spark.sql("describe formatted test1.dirty_store_transactions_tbl").show(200, False)

+----------------------------+-------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                      |comment|
+----------------------------+-------------------------------------------------------------------------------+-------+
|STORE_ID                    |string                                                                         |null   |
|STORE_LOCATION              |string                                                                         |null   |
|PRODUCT_CATEGORY            |string                                                                         |null   |
|PRODUCT_ID                  |string                                                                         |null   |
|MRP                         |double                                                                         |null   |
|CP                          |double            

### Explanation
This cell performs the following step in the notebook workflow: şu işle ilgili bir adımı çalıştırır: `# postgresql'e veriyi yazma`

In [42]:
# postgresql'e veriyi yazma
jdbcUrl = "jdbc:postgresql://localhost/traindb?user=train&password=Ankara06"

### Explanation
This cell performs the following step in the notebook workflow: şu işle ilgili bir adımı çalıştırır: `# postgresql'e veriyi yazma`

In [45]:
# postgresql'e veriyi yazma
start_time = time.time()

df.write \
.jdbc(url= jdbcUrl, 
      table="clean_transactions", 
      mode="overwrite", 
      properties={"driver":"org.postgresql.Driver"})

print("--- %s seconds ---" %(time.time() - start_time))

--- 9.882255554199219 seconds ---


### Explanation
This cell shows sample rows from the DataFrame to inspect how the data looks.

In [53]:
query = """
SELECT *
 FROM public.clean_transactions limit 3;
"""

properties = {
    "user": "train",
    "password": "Ankara06",
    "driver": "org.postgresql.Driver"
}

df_q = spark.read.jdbc(
    url=jdbcUrl,
    table="public.clean_transactions",
    properties=properties
)

df_q.show()

+--------+--------------+----------------+----------+----+-----+--------+-----+----------+
|STORE_ID|STORE_LOCATION|PRODUCT_CATEGORY|PRODUCT_ID| MRP|   CP|DISCOUNT|   SP|      Date|
+--------+--------------+----------------+----------+----+-----+--------+-----+----------+
|  YR7220|      New York|     Electronics|  12254943|31.0|20.77|    1.86|29.14|2019-11-26|
|  YR7220|      New York|       Furniture| 72619323C|15.0| 9.75|     1.5| 13.5|2019-11-26|
|  YR7220|      New York|     Electronics| 34161682B|88.0|62.48|     4.4| 83.6|2019-11-26|
|  YR7220|      New York|         Kitchen|  79411621|91.0|58.24|    3.64|87.36|2019-11-26|
|  YR7220|      New York|         Fashion| 39520263T|85.0| 51.0|    2.55|82.45|2019-11-26|
|  YR7220|      New York|         Kitchen|  93809204|37.0|24.05|    0.74|36.26|2019-11-26|
|  YR7220|      New York|       Cosmetics| 86610412D|80.0| 48.8|     6.4| 73.6|2019-11-26|
|  YR7220|      New York|         Kitchen| 52503356^|71.0| 42.6|    5.68|65.32|2019-11-26|

### Explanation
This cell writes the processed data to the specified target in Parquet format.

In [54]:
# hdfs'e parquet ve sanppy yazma
start_time = time.time()

df.write \
.format("parquet") \
.mode("overwrite") \
.option("compression", "snappy") \
.save("hdfs://localhost:9000/user/train/spark_odev_transaction")

print("---------- %s secs ----------" %(time.time()- start_time))

---------- 32.50256967544556 secs ----------


### Explanation
This cell performs the following step in the notebook workflow: şu işle ilgili bir adımı çalıştırır: `! hdfs dfs -ls /user/train/spark_odev_transaction`

In [55]:
! hdfs dfs -ls /user/train/spark_odev_transaction

Found 2 items
-rw-r--r--   1 train supergroup          0 2026-01-06 16:49 /user/train/spark_odev_transaction/_SUCCESS
-rw-r--r--   1 train supergroup     246102 2026-01-06 16:49 /user/train/spark_odev_transaction/part-00000-a72f19a3-3706-46d4-900f-e6dc55d689a3-c000.snappy.parquet


### Explanation
This cell performs the following step in the notebook workflow: şu işle ilgili bir adımı çalıştırır: `spark.stop()`

In [56]:
spark.stop()