In [1]:
# ! pip install psycopg2
# ! pip install psycopg2-binary
#! pip install python-dotenv

In [1]:
import psycopg2
from dotenv import load_dotenv
import os
import pandas as pd

In [2]:
CURR_DIR=os.getcwd()
#print(CURR_DIR)
env_file_path=os.path.join(CURR_DIR, '.env')
#print(env_file_path)

# load the environment variables
load_dotenv(env_file_path)
env=os.environ

DBNAME=env.get('DBNAME')
USERNAME=env.get('USERNAME')
PASSWORD=env.get('PASSWORD')
HOSTNAME=env.get('HOSTNAME')
PORT=env.get('PORT')
SCHEMA=env.get('SCHEMA')
print(f"Schema : {SCHEMA}")
print(f"DBName: {DBNAME} HOSTNAME: {HOSTNAME}")

Schema : group_11
DBName: everything2023 HOSTNAME: pg.analytics.northwestern.edu


In [5]:
# Establish the connection
conn = psycopg2.connect(
    dbname=DBNAME,
    user=USERNAME,
    password=PASSWORD,
    host=HOSTNAME,
    port=PORT
)

# Create a cursor object to interact with the database
cur = conn.cursor()

# Setting the SearchPath to Group Schema
cur.execute(f"SET search_path TO {SCHEMA};")

In [6]:
# Getting Data from DeptInfo
cur.execute("SELECT * FROM deptinfo LIMIT 10;")
column_names = [desc[0] for desc in cur.description]
print(column_names)

rows = cur.fetchall()
df_deptinfo = pd.DataFrame(rows, columns=column_names)
df_deptinfo

['DEPT', 'DEPTDESC']


Unnamed: 0,DEPT,DEPTDESC
0,800,CLINIQUE
1,801,LESLIE
2,1100,GARY F
3,1107,JACQUES
4,1202,CABERN
5,1301,BE2
6,1704,R LAUREN
7,1905,R & Y
8,2102,CAB
9,2105,R TAYLOR


In [32]:
# Getting Data from SKSTINFO
cur.execute("SELECT * FROM skstinfo LIMIT 10;")
column_names = [desc[0] for desc in cur.description]
print(column_names)

rows = cur.fetchall()
df_skstinfo = pd.DataFrame(rows, columns=column_names)
df_skstinfo

['SKU', 'STORE', 'COST', 'RETAIL']


Unnamed: 0,SKU,STORE,COST,RETAIL
0,3,102,123.36,440.0
1,3,103,123.36,440.0
2,3,104,123.36,440.0
3,3,202,123.36,440.0
4,3,203,123.36,440.0
5,3,204,123.36,440.0
6,3,302,123.36,440.0
7,3,304,123.36,440.0
8,3,307,123.36,440.0
9,3,309,123.36,440.0


In [8]:
# Getting Data from SKUINFO
cur.execute("SELECT * FROM skuinfo LIMIT 10;")
column_names = [desc[0] for desc in cur.description]
print(column_names)

rows = cur.fetchall()
df_skuinfo = pd.DataFrame(rows, columns=column_names)
df_skuinfo

InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block


In [85]:
# Getting Data from STRINFO
cur.execute("SELECT * FROM strinfo LIMIT 10;")
column_names = [desc[0] for desc in cur.description]
print(column_names)

rows = cur.fetchall()
df_strinfo = pd.DataFrame(rows, columns=column_names)
df_strinfo

['STORE', 'CITY', 'STATE', 'ZIP']


Unnamed: 0,STORE,CITY,STATE,ZIP
0,2,ST. PETERSBURG,FL,33710
1,3,ST. LOUIS,MO,63126
2,4,LITTLE ROCK,AR,72201
3,7,FORT WORTH,TX,76137
4,9,TEMPE,AZ,85281
5,60,ASPEN,CO,81611
6,62,TULSA,OK,74110
7,65,SAN ANTONIO,TX,78218
8,67,DALLAS,TX,75207
9,100,SALISBURY,NC,28146


In [86]:
# Using JOIN BASED QUERY
QUERY1 = """
SELECT *
FROM skstinfo as x
inner join 
skuinfo as y 
on x."SKU" = y."SKU"
LIMIT 10;
"""
try:
    cur.execute(QUERY1)
    column_names = [desc[0] for desc in cur.description]
    print(column_names)

    rows = cur.fetchall()
    df_query1= pd.DataFrame(rows, columns=column_names)
    print(df_query1)
except psycopg2.Error as e:
    print(f"Error executing command: {e}")
    conn.rollback()  # Rollback the current transaction

['SKU', 'STORE', 'COST', 'RETAIL', 'SKU', 'DEPT', 'CLASSID', 'UPC', 'STYLE', 'COLOR', 'SIZE', 'PACKSIZE', 'VENDOR', 'BRAND']
    SKU  STORE  COST  RETAIL   SKU  DEPT CLASSID           UPC         STYLE   
0  1918   9806  15.0   19.00  1918  9105     111  400001918000  21B   S45KR4  \
1  4220    102   7.0    4.99  4220  2102     979  400004220000        N51202   
2  4220    103   7.0    9.99  4220  2102     979  400004220000        N51202   
3  4220    309   7.0    5.00  4220  2102     979  400004220000        N51202   
4  4220    502   7.0    5.00  4220  2102     979  400004220000        N51202   
5  4220    503   7.0    9.99  4220  2102     979  400004220000        N51202   
6  4220    603   7.0    9.99  4220  2102     979  400004220000        N51202   
7  4220    703   7.0    9.99  4220  2102     979  400004220000        N51202   
8  4220    802   7.0    5.00  4220  2102     979  400004220000        N51202   
9  4220    809   7.0    5.00  4220  2102     979  400004220000        N5120

In [22]:
cur.close()
conn.close()

In [11]:
%%writefile run_pyspark.py
from pyspark.sql import SparkSession
import psycopg2
from dotenv import load_dotenv
import os
import pandas as pd

CURR_DIR=os.getcwd()
#print(CURR_DIR)
env_file_path=os.path.join(CURR_DIR, '.env')
#print(env_file_path)

# load the environment variables
load_dotenv(env_file_path)
env=os.environ

DBNAME=env.get('DBNAME')
USERNAME=env.get('USERNAME')
PASSWORD=env.get('PASSWORD')
HOSTNAME=env.get('HOSTNAME')
PORT=env.get('PORT')
SCHEMA=env.get('SCHEMA')
print(f"Schema : {SCHEMA}")
print(f"DBName: {DBNAME} HOSTNAME: {HOSTNAME}")

JAR_PATH = "/Users/dare_devil/Desktop/MLDS_2024/Quarter1/Everything_Starts_With_Data/postgresql-42.6.0.jar"
# .config("spark.jars", JAR_PATH) \
def create_spark_session():
    spark = SparkSession.builder \
        .appName("PostgresApp") \
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.18") \
        .getOrCreate()
    return spark

def read_from_postgres(spark, jdbc_url, table_name):
    df = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", table_name) \
        .option("user", USERNAME) \
        .option("password", PASSWORD) \
        .load()
    return df

if __name__ == "__main__":
    spark_session = create_spark_session()

    # Update these details
    jdbc_url = f"jdbc:postgresql://{HOSTNAME}:5432/{DBNAME}"
    table_name = "group_11.deptinfo"

    dataframe = read_from_postgres(spark_session, jdbc_url, table_name)
    dataframe.show()

Writing run_pyspark.py


In [10]:
os.path.exists(JAR_PATH)

True

In [4]:
from pyspark.sql import SparkSession
import warnings

warnings.filterwarnings('ignore')

import os

os.environ["PYSPARK_SUBMIT_ARGS"] = f"--jars  {spark_jar_path} pyspark-shell"


##.config("spark.executor.extraClassPath", spark_jar_path) \

spark = SparkSession.builder \
    .appName("PostgreSQL with PySpark in Jupyter") \
    .config("spark.jars", f"{spark_jar_path}") \
    .getOrCreate()

database_url = 
database_properties = {"user": USERNAME, "password": PASSWORD, "driver": "org.postgresql.Driver"}

23/10/23 10:43:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
df = spark.read.jdbc(database_url, "group_11.deptinfo", properties=database_properties)
df.show()

Py4JJavaError: An error occurred while calling o39.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:249)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)


In [11]:
import findspark
findspark.init()

In [10]:
! pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
