In [1]:
#import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
import psycopg2
spark = SparkSession.builder.appName('finance')\
        .master("local[*]") \
        .config('spark.jars' , 'postgresql-42.7.3.jar'  )\
        .getOrCreate()

In [2]:
df = spark.read.csv(r'weekly_adjusted_IBM.csv',header=True, inferSchema=True)
df.show()

+----------+-------+-------+--------+------+--------------+--------+---------------+
| timestamp|   open|   high|     low| close|adjusted close|  volume|dividend amount|
+----------+-------+-------+--------+------+--------------+--------+---------------+
|2025-04-04| 242.74| 252.79|  226.88|227.48|        227.48|28005665|            0.0|
|2025-03-28| 247.31| 254.32|  242.07| 244.0|         244.0|18354282|            0.0|
|2025-03-21| 249.25| 254.63| 237.224|243.87|        243.87|27866866|            0.0|
|2025-03-14| 261.56| 266.45|  241.68|248.35|        248.35|25513710|            0.0|
|2025-03-07|254.735| 261.96|245.1823|261.54|        261.54|22284160|            0.0|
|2025-02-28|  261.5|263.845|  246.54|252.44|        252.44|25541761|            0.0|
|2025-02-21| 261.93| 265.09|  259.83|261.48|        261.48|18534169|            0.0|
|2025-02-14| 250.86| 261.94|  246.87|261.28|        261.28|19898073|           1.67|
|2025-02-07|  252.4| 265.72|  251.84|252.34|      250.6607|301498

In [4]:
#check null values
for i in df.columns:
    null = df.filter(df[i].isNull()).count()
    print(i,' NULLS = ',null)

timestamp  NULLS =  0
open  NULLS =  0
high  NULLS =  0
low  NULLS =  0
close  NULLS =  0
adjusted close  NULLS =  0
volume  NULLS =  0
dividend amount  NULLS =  0


In [5]:
df.printSchema()

root
 |-- timestamp: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- adjusted close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- dividend amount: double (nullable = true)



In [8]:
from pyspark.sql.functions import col
#sort the entire dataframe by date
df = df.orderBy(col("timestamp").desc())

In [9]:
df = df.withColumnRenamed('timestamp','date')
df.show(3)

+----------+------+------+-------+------+--------------+--------+---------------+
|      date|  open|  high|    low| close|adjusted close|  volume|dividend amount|
+----------+------+------+-------+------+--------------+--------+---------------+
|2025-04-04|242.74|252.79| 226.88|227.48|        227.48|28005665|            0.0|
|2025-03-28|247.31|254.32| 242.07| 244.0|         244.0|18354282|            0.0|
|2025-03-21|249.25|254.63|237.224|243.87|        243.87|27866866|            0.0|
+----------+------+------+-------+------+--------------+--------+---------------+
only showing top 3 rows



In [16]:
df = df.withColumnRenamed('adjusted close','adjusted_close')

In [17]:
# Define database connection parameters including the database name
db_params = {
    'username':'postgres',
    'password':'ahly9667',
    'host':'localhost',
    'port':'5432',
    'database':'vantage'
}

# Connect to the new created database alayta_bank
def db_connected():
    connection = psycopg2.connect(user = 'postgres', 
                                  host= 'localhost',
                                  password = 'ahly9667',
                                  port = 5432,
                                  database ='vantage')

    return connection

conn = db_connected()
print(f"Database {db_params['database']} connected successfully")

Database vantage connected successfully


In [14]:
#create a function to create related tables of schema
def create_table():
    conn = db_connected()
    cursor= conn.cursor()
    query = """
                 drop table if exists stock;

                 create table stock(
                 date date,
                 open float,
                 high float,
                 low float,
                 close float,
                 adjusted_close float,
                 volumn bigint,
                 dividend_amount float)
                 
                """
    cursor.execute(query)
    conn.commit()
    cursor.close()
    conn.close()

create_table()
print('Tables created successfully')

Tables created successfully


In [18]:
#load data into tables
my_url = "jdbc:postgresql://localhost:5432/vantage"
my_properties = {'user' : 'postgres',
              'password' : 'ahly9667',
              'driver' : 'org.postgresql.Driver'}

df.write.jdbc( url= my_url , table = 'stock' , mode ='append' ,properties= my_properties)