In [1]:
# I'm using Ubuntu 20.04, Python 3.8.10, and Spark 3.4.0

In [None]:
import sqlite3
import time
import pyspark
import os
import glob

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType

from pyspark.sql.functions import (
    concat,
    col, 
    lit, 
    row_number, 
    monotonically_increasing_id, 
    to_date, 
    to_utc_timestamp, 
    date_format, 
    lpad,
    from_unixtime,
    unix_timestamp,
    to_timestamp,
    expr
)

from pyspark.sql.window import Window

In [2]:
start_time = time.time()
print(f"Spark version: {pyspark.__version__}")

Spark version: 3.4.0


In [3]:
conf = SparkConf() \
    .setAppName("Load CSV to SQLITE") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath","/home/nicolasterroni/projects/spark/jars/*") \
    .set("spark.jars.packages", "org.xerial:sqlite-jdbc:3.40.0.0")


sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

23/04/29 18:49:51 WARN Utils: Your hostname, nicolasterroni-asus-tuf resolves to a loopback address: 127.0.1.1; using 192.168.0.35 instead (on interface enp2s0)
23/04/29 18:49:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/nicolasterroni/Snoop/snoop-etls/etl/venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/nicolasterroni/.ivy2/cache
The jars for the packages stored in: /home/nicolasterroni/.ivy2/jars
org.xerial#sqlite-jdbc added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6bf8984e-38c9-4552-97b7-110cf0d1343d;1.0
	confs: [default]
	found org.xerial#sqlite-jdbc;3.40.0.0 in central
:: resolution report :: resolve 111ms :: artifacts dl 4ms
	:: modules in use:
	org.xerial#sqlite-jdbc;3.40.0.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-6bf8984e-38c9-4552-97b7-110cf0d1343

In [None]:
# your csv location
main_path = '/home/nicolasterroni/projects/spark/examples/csv_cleaning/'

# your csv filename
csv_name = 'test.csv'

# ensure that the delimiter is correct
df = spark.read.format('csv').option('header', True).option('delimiter', '|').load(main_path+csv_name)
df.printSchema()

In [None]:
df.head(3)

In [None]:
# DATA MODELING

# example of joining 2 dataframes
# df = df.join(df2, on="id", how="inner")

# delete column
# df = df.drop("column")

# converting 'revenue' column from string like '55.000,00' to float
#df = df.withColumn('revenue', col('revenue').replace('.', ''))
#df = df.withColumn('revenue', col('revenue').replace(',', '.'))
#df = df.withColumn('revenue', col('revenue').cast('float'))

# check the transformed data
# df.printSchema()
# df.head(3)


In [None]:
# DATA CLEANING

# examples of changing dates and datetimes format
#df = df.withColumn("date", from_unixtime(unix_timestamp(df["date"], "d/M/yyyy"), "yyyy-MM-dd"))
#df = df.withColumn("datetime", from_unixtime(unix_timestamp(df["datetime"], "d/M/yyyy HH:mm:ss"), "yyyy-MM-dd HH:mm:ss"))

# example of string manipulation, geting a substring
#df = df.withColumn("string", expr("substring(string, 1, length(string) - 4)"))

# convert string to timestamp
#df = df.withColumn("datetime", to_timestamp(df["datetime"], "yyyy/MM/dd HH:mm:ss"))

# check the transformed data
# df.printSchema()
# df.head(3)

In [None]:
# specify the schema, to create table if it doesnt exist
schema = StructType([
    StructField("name", StringType()),
    StructField("age", StringType()),
])


url = 'jdbc:sqlite:/your/sqlite/location/test.db'
table_name = 'test'

mode = 'overwrite' # TRUNCATE

properties = {
    'driver': 'org.sqlite.JDBC',
}

conn = sqlite3.connect('test.db')

print("Connected to sqlite db")


# create table if it does not exist
conn.execute(f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        name TEXT,
        age TEXT
    )
""")

print("Database created")

# write
df.write.mode('overwrite').option('driver', 'org.sqlite.JDBC').jdbc(url=url, table=table_name, mode=mode)
print(f"Written data: {table_name} in database.")

# check the written data
read_df = spark.read.format('jdbc').option('driver', 'org.sqlite.JDBC').option('url', url).option('dbtable', table_name).load()
print("Actual data from database:")
read_df.show()
