In [12]:
##import required libraries
from pyspark.sql import SparkSession


##create spark session
spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.driver.host", "localhost").config('spark.driver.extraClassPath', "C:/Users/HanSAriEle/Downloads/postgresql-42.7.3.jar").config('spark.master', 'local[*]').getOrCreate()

##read movies table from db using spark
def extract_movies_to_df():
    movies_df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/films") \
        .option("dbtable", "movie") \
        .option("user", "postgres") \
        .option("password", "ha__ns") \
        .option("driver", "org.postgresql.Driver") \
        .load()
    return movies_df

##read users table from db using spark
def extract_users_to_df():
    users_df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/films") \
        .option("dbtable", "users") \
        .option("user", "postgres") \
        .option("password", "ha__ns") \
        .option("driver", "org.postgresql.Driver") \
        .load()
    return users_df


def transform_avg_ratings(movies_df, users_df):
    ## transforming tables
    avg_rating = users_df.groupBy("movie_id").mean("rating")
    df = movies_df.join(
    avg_rating,
    movies_df.id == avg_rating.movie_id
    )
    df = df.drop("movie_id")
    return df


##load transformed dataframe to the database
def load_df_to_db(df):
    mode = "overwrite"
    url = "jdbc:postgresql://localhost:5432/films"
    properties = {"user": "postgres",
                  "password": "ha__ns",
                  "driver": "org.postgresql.Driver"
                  }
    df.write.jdbc(url=url,
                  table = "avg_ratings",
                  mode = mode,
                  properties = properties)

if __name__ == "__main__":
    movies_df = extract_movies_to_df()
    users_df = extract_users_to_df()
    ratings_df = transform_avg_ratings(movies_df, users_df)
    load_df_to_db(ratings_df)


In [13]:
transform_avg_ratings(movies_df, users_df)

DataFrame[id: int, name: string, description: string, category: string, avg(rating): decimal(38,22)]

In [14]:
import pendulum
import airflow 
from airflow.operators.python import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta

## define the etl function
def etl() :
    movies_df = extract_movies_to_df()
    users_df = extract_users_to_df()
    transformed_df = transform_avg_ratings(movies_df, users_df)
    load_df_to_db(transformed_df)

## define the arguments for the DAG
default_args = {
    'start_date': pendulum.today('UTC').add(days=-1),
    'owner': 'hansari',
    'depends_on_past': True,
    'email': ['hansearieldo@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=30),
}

## instantiate the DAG
dag = DAG(dag_id = "films",
          default_args = default_args,
          schedule = "0 0 * * *")

## define the etl task
etl_task = PythonOperator(task_id = "etl_task",
                          python_callable = etl,
                          dag = dag)

etl_task

<Task(PythonOperator): etl_task>

In [None]:
pip install apache-airflow

In [None]:
pip install waitress

In [None]:
pip install pendulum