In [1]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime
from pyspark.sql.functions import col, trim, regexp_replace

In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read CSV from HDFS") \
    .config("spark.driver.memory", "8g") \
    .config("spark.hadoop.fs.defaultsFS", "hdfs://localhost:9000") \
    .getOrCreate()

# Read CSV file from HDFS
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("escape", "\"") \
    .load("hdfs://localhost:9000/csv_data/train_data.csv")

df.show(20)

+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|      Country|           City|         State|Postal Code| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|
+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+
|     1|CA-2017-152156|08/11/2017|11/11/2017|  Second Class|   CG-12520|       Claire Gute|   Consumer|United States|      Henderson|      Kentucky|      42420|  South|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset Col...|  261.96|
|     2|CA-2017-152156|0

In [3]:
df.write.format("parquet").mode("overwrite").save("hdfs://localhost:9000/csv_data/temp_parquet")
parquet_df = spark.read.format("parquet").load("hdfs://localhost:9000/csv_data/temp_parquet")
pandas_df = parquet_df.toPandas()
print(pandas_df.head())

   Row ID        Order ID  Order Date   Ship Date       Ship Mode Customer ID  \
0       1  CA-2017-152156  08/11/2017  11/11/2017    Second Class    CG-12520   
1       2  CA-2017-152156  08/11/2017  11/11/2017    Second Class    CG-12520   
2       3  CA-2017-138688  12/06/2017  16/06/2017    Second Class    DV-13045   
3       4  US-2016-108966  11/10/2016  18/10/2016  Standard Class    SO-20335   
4       5  US-2016-108966  11/10/2016  18/10/2016  Standard Class    SO-20335   

     Customer Name    Segment        Country             City       State  \
0      Claire Gute   Consumer  United States        Henderson    Kentucky   
1      Claire Gute   Consumer  United States        Henderson    Kentucky   
2  Darrin Van Huff  Corporate  United States      Los Angeles  California   
3   Sean O'Donnell   Consumer  United States  Fort Lauderdale     Florida   
4   Sean O'Donnell   Consumer  United States  Fort Lauderdale     Florida   

   Postal Code Region       Product ID         Cat

In [4]:
pandas_df.info()

# generate preprocessing.csv from panda_df
pandas_df.to_csv('preprocessing.csv', index=False)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9800 entries, 0 to 9799
Data columns (total 18 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   Row ID         9800 non-null   int32  
 1   Order ID       9800 non-null   object 
 2   Order Date     9800 non-null   object 
 3   Ship Date      9800 non-null   object 
 4   Ship Mode      9800 non-null   object 
 5   Customer ID    9800 non-null   object 
 6   Customer Name  9800 non-null   object 
 7   Segment        9800 non-null   object 
 8   Country        9800 non-null   object 
 9   City           9800 non-null   object 
 10  State          9800 non-null   object 
 11  Postal Code    9789 non-null   float64
 12  Region         9800 non-null   object 
 13  Product ID     9800 non-null   object 
 14  Category       9800 non-null   object 
 15  Sub-Category   9800 non-null   object 
 16  Product Name   9800 non-null   object 
 17  Sales          9800 non-null   float64
dtypes: float

In [5]:
# Convert postal code from XXXXX.0 to XXXXX
pandas_df['Postal Code'] = pandas_df['Postal Code'].apply(lambda x: str(x).split('.')[0] if pd.notnull(x) else x)

print(pandas_df['Postal Code'].head())

# Fill missing values with 0 before converting to integers
pandas_df['Postal Code'] = pandas_df['Postal Code'].fillna(0).astype(int)

print(pandas_df['Postal Code'].head())

0    42420
1    42420
2    90036
3    33311
4    33311
Name: Postal Code, dtype: object
0    42420
1    42420
2    90036
3    33311
4    33311
Name: Postal Code, dtype: int64


In [6]:
pandas_df['Order Date'] = pd.to_datetime(pandas_df['Order Date'], dayfirst=True)
pandas_df['Ship Date'] = pd.to_datetime(pandas_df['Ship Date'], dayfirst=True)

print(pandas_df['Order Date'].head())
print(pandas_df['Ship Date'].head())

0   2017-11-08
1   2017-11-08
2   2017-06-12
3   2016-10-11
4   2016-10-11
Name: Order Date, dtype: datetime64[ns]
0   2017-11-11
1   2017-11-11
2   2017-06-16
3   2016-10-18
4   2016-10-18
Name: Ship Date, dtype: datetime64[ns]


In [23]:
# Remove trailing whitespaces from columns
pandas_df.columns = pandas_df.columns.str.strip()

# Remove non-breaking spaces from columns
pandas_df.columns = pandas_df.columns.str.replace(u'\xa0', u' ')
pandas_df = pandas_df.replace({"\u00A0": " "}, regex=True)

# Generate output1.csv from panda_df
pandas_df.to_csv('output_cleaned_fix.csv', index=False)

pandas_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9800 entries, 0 to 9799
Data columns (total 18 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   Row ID         9800 non-null   int64         
 1   Order ID       9800 non-null   object        
 2   Order Date     9800 non-null   datetime64[ns]
 3   Ship Date      9800 non-null   datetime64[ns]
 4   Ship Mode      9800 non-null   object        
 5   Customer ID    9800 non-null   object        
 6   Customer Name  9800 non-null   object        
 7   Segment        9800 non-null   object        
 8   Country        9800 non-null   object        
 9   City           9800 non-null   object        
 10  State          9800 non-null   object        
 11  Postal Code    9800 non-null   int64         
 12  Region         9800 non-null   object        
 13  Product ID     9800 non-null   object        
 14  Category       9800 non-null   object        
 15  Sub-Category   9800 n

In [49]:
# Create SparkSession
spark = SparkSession.builder \
    .appName('SparkByExamples.com') \
    .config("spark.jars", "C:/mysql_connector/mysql-connector-java-8.0.26.jar") \
    .getOrCreate()

In [50]:
# Convert Pandas DataFrame to Spark DataFrame
df_upload = spark.createDataFrame(pandas_df)

In [51]:

# Fact Table: sales
fact_sales = df_upload.select(
    "Order ID", "Customer ID", "Product ID", "Sales"
)

# Dimension Table: t_customer
dim_customer = df_upload.select(
    "Customer ID", "Customer Name", "Segment"
)

# Dimension Table: t_product
dim_product = df_upload.select(
    "Product ID", "Category", "Sub-Category", "Product Name"
)

# Dimension Table: t_location
dim_location = df_upload.select(
    "Postal Code", "City", "State", "Region", "Country"
)

# Dimension Table: t_order
dim_order = df_upload.select(
    "Order ID", "Order Date", "Ship Date", "Ship Mode"
)


In [54]:
# Write fact_sales to MySQL
fact_sales.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3307/dbsales") \
    .option("dbtable", "sales") \
    .option("user", "root") \
    .option("password", "admin") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

# Write dim_customer to MySQL
dim_customer.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3307/dbsales") \
    .option("dbtable", "t_customer") \
    .option("user", "root") \
    .option("password", "admin") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

# Write dim_product to MySQL
dim_product.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3307/dbsales") \
    .option("dbtable", "t_product") \
    .option("user", "root") \
    .option("password", "admin") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

# Write dim_location to MySQL
dim_location.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3307/dbsales") \
    .option("dbtable", "t_location") \
    .option("user", "root") \
    .option("password", "admin") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

# Write dim_order to MySQL
dim_order.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3307/dbsales") \
    .option("dbtable", "t_order") \
    .option("user", "root") \
    .option("password", "admin") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()


Py4JJavaError: An error occurred while calling o545.save.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:254)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:258)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [51]:
# Create DataFrame 
df = spark.createDataFrame(pandas_df)

In [None]:
# Show schema
df.printSchema()

In [None]:
# save processed data to HDFS
processed_df = spark.createDataFrame(pandas_df)
processed_df.write.format("csv").mode("overwrite").save("hdfs://localhost:9000/csv_data/processed_data.csv")

In [None]:
from pyspark.sql import SparkSession
from delta import *

# Convert pandas dataframe back to Spark dataframe
data = spark.createDataFrame(pandas_df)

# Membuat Spark session dengan Delta Lake
spark = SparkSession.builder \
    .appName("DeltaLakeExample") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0") \
    .getOrCreate()

# Membaca data dan menulis ke Delta
#data = spark.read.csv("data.csv", header=True, inferSchema=True)
data.write.format("delta").save("hdfs://localhost:9000/csv_data/")


In [None]:
import subprocess

# Define paths
hdfs_path = "hdfs://localhost:9000/csv_data/"
csv_file = "/csv_data/part-00000*.csv"
new_file = "/csv_data/delta_lake.csv"

# Rename using HDFS shell commands
subprocess.run(["hdfs", "dfs", "-mv", hdfs_path + csv_file, hdfs_path + new_file])


Membuat Delta Lake 

In [22]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("PySpark MySQL Connection") \
    .config("spark.jars", "C:\mysql-connector-j-9.1.0.jar") \
    .getOrCreate()
