# Spark session

In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("load-postgres")
         # Add postgres jar
         #.config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-9.4.1207.jar")
         .config("spark.driver.extraClassPath", "/usr/local/spark/resources/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

21/12/29 16:06:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Read CSV data

In [2]:
df_movies_csv = (
    spark.read
    .format("csv")
    .option("header", True)
    .load("/home/jovyan/work/data/movies.csv")
)

In [3]:
df_ratings_csv = (
    spark.read
    .format("csv")
    .option("header", True)
    .load("/home/jovyan/work/data/ratings.csv")
    .withColumnRenamed("timestamp","timestamp_epoch")
)

In [4]:
# Convert epoch to timestamp and rating to DoubleType
from pyspark.sql.functions import from_unixtime, col, to_timestamp
from pyspark.sql.types import DoubleType

df_ratings_csv_fmt = (
    df_ratings_csv
    .withColumn('rating', col("rating").cast(DoubleType()))
    .withColumn('timestamp', to_timestamp(from_unixtime(col("timestamp_epoch"))))
)

# Load data to Postgres

In [6]:
(df_movies_csv.write
 .format("jdbc")
 .option("url", "jdbc:postgresql://postgres/test")
 .option("dbtable", "public.movies")
 .option("user", "test")
 .option("password", "postgres")
 .mode("overwrite")
 .save())

In [7]:
(df_ratings_csv_fmt
 .select([c for c in df_ratings_csv_fmt.columns if c != "timestamp_epoch"])
 .write
 .format("jdbc")
 .option("url", "jdbc:postgresql://postgres/test")
 .option("dbtable", "public.ratings")
 .option("user", "test")
 .option("password", "postgres")
 .mode("overwrite")
 .save())

                                                                                

In [7]:
df_ratings_csv.limit(10).toPandas()

Unnamed: 0,userId,movieId,rating,timestamp_epoch
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931
5,1,70,3.0,964982400
6,1,101,5.0,964980868
7,1,110,4.0,964982176
8,1,151,5.0,964984041
9,1,157,5.0,964984100


In [2]:
df_movies = (
    spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://postgres/airflow")
    .option("dbtable", "public.dag")
 .option("user", "airflow")
 .option("password", "airflow")
    .load() )

In [3]:
df_movies.limit(10).toPandas()

Unnamed: 0,dag_id,is_paused,is_subdag,is_active,last_scheduler_run,last_pickled,last_expired,scheduler_lock,pickle_id,fileloc,owners,description,default_view,schedule_interval,root_dag_id
0,spark-hello-world-module,True,False,True,2021-12-29 16:06:21.480572,NaT,NaT,,,/usr/local/airflow/dags/spark-hello-world-modu...,airflow,This DAG runs a Pyspark app that uses modules.,,"{""type"": ""timedelta"", ""attrs"": {""days"": 1, ""se...",
1,spark-test,True,False,True,2021-12-29 16:06:22.482156,NaT,NaT,,,/usr/local/airflow/dags/spark-test.py,airflow,This DAG runs a simple Pyspark app.,,"{""type"": ""timedelta"", ""attrs"": {""days"": 1, ""se...",
2,spark-postgres,True,False,True,2021-12-29 16:06:22.492430,NaT,NaT,,,/usr/local/airflow/dags/spark-postgres.py,airflow,This DAG is a sample of integration between Sp...,,"{""type"": ""timedelta"", ""attrs"": {""days"": 1, ""se...",
