In [1]:
import os 
import socket
from pyspark.sql import SparkSession,SQLContext
from pyspark import SparkConf,SparkContext

#setup spark configrations
conf = (SparkConf().setAppName("write csv to mysql")
        .setMaster("spark://spark-master:7077")
        .set("spark.executor.memory", "8g")  
        .set("spark.driver.memory", "8g")   
        .set("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000")
        .set("spark.jars","/opt/spark/jars/mysql-connector-java-8.0.30.jar")
       )

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

print("Spark application is running locally")
print(spark.sparkContext.getConf().get("spark.jars"))


2024-09-18 22:02:50,572 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark application is running locally
/opt/spark/jars/mysql-connector-java-8.0.30.jar


In [2]:
local_file_path = "./customer_info.csv"
# Get the absolute path of the local file
absolute_path = os.path.abspath(local_file_path)

# Get the host address (IP)
host_address = socket.gethostbyname(socket.gethostname())
# Print the absolute path and host address
print(f"Absolute file path: {absolute_path}")
print(f"Host address: {host_address}")


#read file to dataframe
df = spark.read.csv("file:///mnt/data/data/customer_info.csv", header=False, inferSchema=True)
df = df.coalesce(1)  # Reduce to 1 partition for small data 
df.show(5)

Absolute file path: /mnt/data/data/customer_info.csv
Host address: 172.19.0.14


[Stage 1:>                                                          (0 + 1) / 1]

+-----------+------------+------------+--------------------+---------+------------------+-------------------+--------+-------------------+-------------------+
|        _c0|         _c1|         _c2|                 _c3|      _c4|               _c5|                _c6|     _c7|                _c8|                _c9|
+-----------+------------+------------+--------------------+---------+------------------+-------------------+--------+-------------------+-------------------+
|customer_id|        name|phone_number|               email|  address|    id_card_number|  registration_date|  status|         created_at|         updated_at|
|  CUST_0001|Mona Solberg| +4792486972|customer_1@exampl...|Address_1|960169599320495002|2023-08-05 06:07:46|Inactive|2023-08-05 06:07:46|2023-08-05 06:07:46|
|  CUST_0002|   Erik Berg| +4714357923|customer_2@exampl...|Address_2|890342987726267449|2021-04-10 14:26:01|Inactive|2021-04-10 14:26:01|2021-04-10 14:26:01|
|  CUST_0003|   Erik Berg| +4722304115|custome

                                                                                

In [5]:
df_filtered = df.filter(df["_c0"] != "customer_id")
df_filtered.show(1)

+---------+------------+-----------+--------------------+---------+------------------+-------------------+--------+-------------------+-------------------+
|      _c0|         _c1|        _c2|                 _c3|      _c4|               _c5|                _c6|     _c7|                _c8|                _c9|
+---------+------------+-----------+--------------------+---------+------------------+-------------------+--------+-------------------+-------------------+
|CUST_0001|Mona Solberg|+4792486972|customer_1@exampl...|Address_1|960169599320495002|2023-08-05 06:07:46|Inactive|2023-08-05 06:07:46|2023-08-05 06:07:46|
+---------+------------+-----------+--------------------+---------+------------------+-------------------+--------+-------------------+-------------------+
only showing top 1 row



In [7]:
mysql_url = "jdbc:mysql://mysql:3306/my_database"
# connect to mysql 
mysql_properties = {
    "user": "my_user",
    "password": "root",
    "driver": "com.mysql.cj.jdbc.Driver"
}

df_filtered = df_filtered.toDF("customer_id", "name", "phone_number", "email", "address", 
                               "id_card_number", "registration_date", "status", "created_at", "updated_at")

# dataframe write data to mysql/
df_filtered.write.jdbc(url=mysql_url,table="customer_info",mode= "append",properties=mysql_properties)
df_mysql = spark.read.jdbc(url=mysql_url, table="customer_info", properties=mysql_properties)
df_mysql.show(5)



+-----------+-------------+------------+--------------------+---------+------------------+-------------------+--------+-------------------+-------------------+
|customer_id|         name|phone_number|               email|  address|    id_card_number|  registration_date|  status|         created_at|         updated_at|
+-----------+-------------+------------+--------------------+---------+------------------+-------------------+--------+-------------------+-------------------+
|  CUST_0001| Mona Solberg| +4792486972|customer_1@exampl...|Address_1|960169599320495002|2023-08-05 06:07:46|Inactive|2023-08-05 06:07:46|2023-08-05 06:07:46|
|  CUST_0002|    Erik Berg| +4714357923|customer_2@exampl...|Address_2|890342987726267449|2021-04-10 14:26:01|Inactive|2021-04-10 14:26:01|2021-04-10 14:26:01|
|  CUST_0003|    Erik Berg| +4722304115|customer_3@exampl...|Address_3|248359464321612103|2022-11-28 05:25:28|  Active|2022-11-28 05:25:28|2022-11-28 05:25:28|
|  CUST_0004| Mona Solberg| +4783868647|