-
Notifications
You must be signed in to change notification settings - Fork 119
/
spark-postgres.py
69 lines (59 loc) · 2.43 KB
/
spark-postgres.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
###############################################
# Parameters
###############################################
spark_master = "spark://spark:7077"
postgres_driver_jar = "/usr/local/spark/resources/jars/postgresql-9.4.1207.jar"
movies_file = "/usr/local/spark/resources/data/movies.csv"
ratings_file = "/usr/local/spark/resources/data/ratings.csv"
postgres_db = "jdbc:postgresql://postgres/test"
postgres_user = "test"
postgres_pwd = "postgres"
###############################################
# DAG Definition
###############################################
now = datetime.now()
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(now.year, now.month, now.day),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
}
dag = DAG(
dag_id="spark-postgres",
description="This DAG is a sample of integration between Spark and DB. It reads CSV files, load them into a Postgres DB and then read them from the same Postgres DB.",
default_args=default_args,
schedule_interval=timedelta(1)
)
start = DummyOperator(task_id="start", dag=dag)
spark_job_load_postgres = SparkSubmitOperator(
task_id="spark_job_load_postgres",
application="/usr/local/spark/app/load-postgres.py", # Spark application path created in airflow and spark cluster
name="load-postgres",
conn_id="spark_default",
verbose=1,
conf={"spark.master":spark_master},
application_args=[movies_file,ratings_file,postgres_db,postgres_user,postgres_pwd],
jars=postgres_driver_jar,
driver_class_path=postgres_driver_jar,
dag=dag)
spark_job_read_postgres = SparkSubmitOperator(
task_id="spark_job_read_postgres",
application="/usr/local/spark/app/read-postgres.py", # Spark application path created in airflow and spark cluster
name="read-postgres",
conn_id="spark_default",
verbose=1,
conf={"spark.master":spark_master},
application_args=[postgres_db,postgres_user,postgres_pwd],
jars=postgres_driver_jar,
driver_class_path=postgres_driver_jar,
dag=dag)
end = DummyOperator(task_id="end", dag=dag)
start >> spark_job_load_postgres >> spark_job_read_postgres >> end