PostgreSQL - PySpark
====================

# References
* [Data Engineering Helpers - Knowledge Sharing - PostgreSQL cheat sheet (this Git repository)](https://github.com/data-engineering-helpers/ks-cheat-sheets/tree/main/db/postgresql)
  + [PostgreSQL - PySpark SDK (this notebook)](https://github.com/data-engineering-helpers/ks-cheat-sheets/tree/main/db/postgresql/ipython-notebooks/postgresql-pyspark.ipynb)
  + [PostgreSQL - Python Alchemy SDK](https://github.com/data-engineering-helpers/ks-cheat-sheets/tree/main/db/postgresql/ipython-notebooks/postgresql-python-sdk.ipynb)
* [Data Engineering Helpers - Knowledge Sharing - Jupyter, PySpark and DuckDB](https://github.com/data-engineering-helpers/ks-cheat-sheets/tree/main/programming/jupyter/jupyter-pyspark-duckdb/ipython-notebooks)

## Spark
* [Apache Spark doc - JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)

## Data
* Structure of the `country_info` table:
  https://github.com/data-engineering-helpers/ks-cheat-sheets/blob/main/db/postgresql/sql/create-geonames-tables.sql
* Source: [Geonames data](https://download.geonames.org/export/dump/)

In [1]:
import confmgr
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [2]:
# Create a SparkConf object
sparkconf = (
    SparkConf()
    .set("spark.jars", "jars/postgresql-42.6.0.jar,jars/delta-core_2.12-2.4.0.jar")
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

# When the Python kernel is a PySpark one, the following (specifying the spark variable) is not necessary
spark = SparkSession.builder.config(conf=sparkconf).getOrCreate()

23/09/07 13:39:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
%%sh
ls -lFh data/csv/

total 128
-rw-r--r--  1 DENIS  1000    28K Sep  6 19:25 countryInfo.csv
-rw-r--r--  1 DENIS  1000    31K Sep  6 19:08 countryInfo.txt


In [4]:
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType
ctry_columns = (StructType()
                .add("iso_alpha2",StringType(),True)
                .add("iso_alpha3",StringType(),True)
                .add("iso_numeric",IntegerType(),True)
                .add("fips_code",StringType(),True)
                .add("name",StringType(),True)
                .add("capital",StringType(),True)
                .add("areainsqkm",DoubleType(),True)
                .add("population",IntegerType(),True)
                .add("continent",StringType(),True)
                .add("tld",StringType(),True)
                .add("currency_code",StringType(),True)
                .add("currency_name",StringType(),True)
                .add("phone",StringType(),True)
                .add("postal_code_format",StringType(),True)
                .add("postal_code_regex",StringType(),True)
                .add("languages",StringType(),True)
                .add("geonameId",IntegerType(),True)
                .add("neighbours",StringType(),True)
                .add("equivalent_fips_code",StringType(),True)
               )

In [5]:
countries_data_fp: str = "data/csv/countryInfo.csv"
ctry_df = spark.read.option("inferSchema",True).option("header", False).option("sep", "\t").schema(ctry_columns).csv(countries_data_fp)
countries_df = ctry_df.toPandas()
countries_df

                                                                                

Unnamed: 0,iso_alpha2,iso_alpha3,iso_numeric,fips_code,name,capital,areainsqkm,population,continent,tld,currency_code,currency_name,phone,postal_code_format,postal_code_regex,languages,geonameId,neighbours,equivalent_fips_code
0,AD,AND,20,AN,Andorra,Andorra la Vella,468.0,77006,EU,.ad,EUR,Euro,376,AD###,^(?:AD)*(\d{3})$,ca,3041565,"ES,FR",
1,AE,ARE,784,AE,United Arab Emirates,Abu Dhabi,82880.0,9630959,AS,.ae,AED,Dirham,971,,,"ar-AE,fa,en,hi,ur",290557,"SA,OM",
2,AF,AFG,4,AF,Afghanistan,Kabul,647500.0,37172386,AS,.af,AFN,Afghani,93,,,"fa-AF,ps,uz-AF,tk",1149361,"TM,CN,IR,TJ,PK,UZ",
3,AG,ATG,28,AC,Antigua and Barbuda,St. John's,443.0,96286,,.ag,XCD,Dollar,+1-268,,,en-AG,3576396,,
4,AI,AIA,660,AV,Anguilla,The Valley,102.0,13254,,.ai,XCD,Dollar,+1-264,,,en-AI,3573511,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
247,ZA,ZAF,710,SF,South Africa,Pretoria,1219912.0,57779622,AF,.za,ZAR,Rand,27,####,^(\d{4})$,"zu,xh,af,nso,en-ZA,tn,st,ts,ss,ve,nr",953987,"ZW,SZ,MZ,BW,NA,LS",
248,ZM,ZMB,894,ZA,Zambia,Lusaka,752614.0,17351822,AF,.zm,ZMW,Kwacha,260,#####,^(\d{5})$,"en-ZM,bem,loz,lun,lue,ny,toi",895949,"ZW,TZ,MZ,CD,NA,MW,AO",
249,ZW,ZWE,716,ZI,Zimbabwe,Harare,390580.0,14439018,AF,.zw,ZWL,Dollar,263,,,"en-ZW,sn,nr,nd",878675,"ZA,MZ,BW,ZM",
250,CS,SCG,891,YI,Serbia and Montenegro,Belgrade,102350.0,10829175,EU,.cs,RSD,Dinar,381,#####,^(\d{5})$,"cu,hu,sq,sr",8505033,"AL,HU,MK,RO,HR,BA,BG",


In [6]:
db_cfg: dict = confmgr.get_db_conn_dict()
pg_host: str = db_cfg.get("host")
pg_port: int = db_cfg.get("port")
pg_dbname: str = db_cfg.get("dbname")
pg_user: str = db_cfg.get("user")
pg_passwd: str = db_cfg.get("passwd")

In [7]:
jdbc_conn_str: str = f"jdbc:postgresql://{pg_host}/{pg_dbname}"
(ctry_df.write
 .format("jdbc")
 .option("driver", "org.postgresql.Driver")
 .option("url", jdbc_conn_str)
 .option("dbtable", "country_info2")
 .option("user", pg_user)
 .option("password", pg_passwd)
 .option("sslmode", "disable")
 .mode("Overwrite")
 .save()
)

In [8]:
ctry_check_df = (
    spark.read.format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", jdbc_conn_str)
    .option("dbtable", "country_info2")
    .option("user", pg_user)
    .option("password", pg_passwd)
    .option("sslmode", "disable")
    .load()
)
ctry_check_df.toPandas()

Unnamed: 0,iso_alpha2,iso_alpha3,iso_numeric,fips_code,name,capital,areainsqkm,population,continent,tld,currency_code,currency_name,phone,postal_code_format,postal_code_regex,languages,geonameId,neighbours,equivalent_fips_code
0,AD,AND,20,AN,Andorra,Andorra la Vella,468.0,77006,EU,.ad,EUR,Euro,376,AD###,^(?:AD)*(\d{3})$,ca,3041565,"ES,FR",
1,AE,ARE,784,AE,United Arab Emirates,Abu Dhabi,82880.0,9630959,AS,.ae,AED,Dirham,971,,,"ar-AE,fa,en,hi,ur",290557,"SA,OM",
2,AF,AFG,4,AF,Afghanistan,Kabul,647500.0,37172386,AS,.af,AFN,Afghani,93,,,"fa-AF,ps,uz-AF,tk",1149361,"TM,CN,IR,TJ,PK,UZ",
3,AG,ATG,28,AC,Antigua and Barbuda,St. John's,443.0,96286,,.ag,XCD,Dollar,+1-268,,,en-AG,3576396,,
4,AI,AIA,660,AV,Anguilla,The Valley,102.0,13254,,.ai,XCD,Dollar,+1-264,,,en-AI,3573511,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
247,ZA,ZAF,710,SF,South Africa,Pretoria,1219912.0,57779622,AF,.za,ZAR,Rand,27,####,^(\d{4})$,"zu,xh,af,nso,en-ZA,tn,st,ts,ss,ve,nr",953987,"ZW,SZ,MZ,BW,NA,LS",
248,ZM,ZMB,894,ZA,Zambia,Lusaka,752614.0,17351822,AF,.zm,ZMW,Kwacha,260,#####,^(\d{5})$,"en-ZM,bem,loz,lun,lue,ny,toi",895949,"ZW,TZ,MZ,CD,NA,MW,AO",
249,ZW,ZWE,716,ZI,Zimbabwe,Harare,390580.0,14439018,AF,.zw,ZWL,Dollar,263,,,"en-ZW,sn,nr,nd",878675,"ZA,MZ,BW,ZM",
250,CS,SCG,891,YI,Serbia and Montenegro,Belgrade,102350.0,10829175,EU,.cs,RSD,Dinar,381,#####,^(\d{5})$,"cu,hu,sq,sr",8505033,"AL,HU,MK,RO,HR,BA,BG",


# Check that the table has been created with SQLAlchemy

In [9]:
import pandas as pd
from sqlalchemy import create_engine

In [10]:
pg_connstr: str = confmgr.get_db_conn_string(verbose=True)
engine = create_engine(pg_connstr)
geo_countries_select_query: str = "select * from country_info2;"
df: pd.DataFrame = pd.read_sql_query(geo_countries_select_query, engine)
df

As the 'passwd' field is left empty in the 'config.json' configuration file, the password will be read from the ~/.pgpass secret file


Unnamed: 0,iso_alpha2,iso_alpha3,iso_numeric,fips_code,name,capital,areainsqkm,population,continent,tld,currency_code,currency_name,phone,postal_code_format,postal_code_regex,languages,geonameId,neighbours,equivalent_fips_code
0,AD,AND,20,AN,Andorra,Andorra la Vella,468.0,77006,EU,.ad,EUR,Euro,376,AD###,^(?:AD)*(\d{3})$,ca,3041565,"ES,FR",
1,AE,ARE,784,AE,United Arab Emirates,Abu Dhabi,82880.0,9630959,AS,.ae,AED,Dirham,971,,,"ar-AE,fa,en,hi,ur",290557,"SA,OM",
2,AF,AFG,4,AF,Afghanistan,Kabul,647500.0,37172386,AS,.af,AFN,Afghani,93,,,"fa-AF,ps,uz-AF,tk",1149361,"TM,CN,IR,TJ,PK,UZ",
3,AG,ATG,28,AC,Antigua and Barbuda,St. John's,443.0,96286,,.ag,XCD,Dollar,+1-268,,,en-AG,3576396,,
4,AI,AIA,660,AV,Anguilla,The Valley,102.0,13254,,.ai,XCD,Dollar,+1-264,,,en-AI,3573511,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
247,ZA,ZAF,710,SF,South Africa,Pretoria,1219912.0,57779622,AF,.za,ZAR,Rand,27,####,^(\d{4})$,"zu,xh,af,nso,en-ZA,tn,st,ts,ss,ve,nr",953987,"ZW,SZ,MZ,BW,NA,LS",
248,ZM,ZMB,894,ZA,Zambia,Lusaka,752614.0,17351822,AF,.zm,ZMW,Kwacha,260,#####,^(\d{5})$,"en-ZM,bem,loz,lun,lue,ny,toi",895949,"ZW,TZ,MZ,CD,NA,MW,AO",
249,ZW,ZWE,716,ZI,Zimbabwe,Harare,390580.0,14439018,AF,.zw,ZWL,Dollar,263,,,"en-ZW,sn,nr,nd",878675,"ZA,MZ,BW,ZM",
250,CS,SCG,891,YI,Serbia and Montenegro,Belgrade,102350.0,10829175,EU,.cs,RSD,Dinar,381,#####,^(\d{5})$,"cu,hu,sq,sr",8505033,"AL,HU,MK,RO,HR,BA,BG",
