In [1]:
import findspark

findspark.add_jars('/app/postgresql-42.1.4.jar')
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("Stocks:ETL")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [3]:
spark.version

'2.4.5'

In [4]:
stocks_dir = '/dataset/stocks-small'

In [5]:
import sys

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [6]:
df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(stocks_dir)

In [7]:
df.count()
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- OpenInt: integer (nullable = true)



In [8]:
df.show()

+-------------------+------+------+------+------+------+-------+
|               Date|  Open|  High|   Low| Close|Volume|OpenInt|
+-------------------+------+------+------+------+------+-------+
|1962-01-02 00:00:00| 6.413| 6.413|6.3378|6.3378|467056|      0|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963|350294|      0|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295|314365|      0|
|1962-01-05 00:00:00|6.3211|6.3211|6.1958|6.2041|440112|      0|
|1962-01-08 00:00:00|6.2041|6.2041|6.0373| 6.087|655676|      0|
|1962-01-09 00:00:00|6.1208|6.2376|6.1208|6.1621|592806|      0|
|1962-01-10 00:00:00|6.1707|6.2041|6.1707|6.1707|359274|      0|
|1962-01-11 00:00:00|6.1875|6.2376|6.1875|6.2376|386220|      0|
|1962-01-12 00:00:00|6.2543|6.2962|6.2543|6.2543|529933|      0|
|1962-01-15 00:00:00|6.2708|6.2962|6.2708|6.2792|305383|      0|
|1962-01-16 00:00:00|6.2708|6.2708|6.2128|6.2128|305383|      0|
|1962-01-17 00:00:00|6.1875|6.1875|6.0956|6.1125|502984|      0|
|1962-01-18 00:00:00|6.12

In [9]:
df = df.withColumn('filename', F.input_file_name())

In [10]:
df.show(truncate=False)

+-------------------+------+------+------+------+------+-------+---------------------------------------+
|Date               |Open  |High  |Low   |Close |Volume|OpenInt|filename                               |
+-------------------+------+------+------+------+------+-------+---------------------------------------+
|1962-01-02 00:00:00|6.413 |6.413 |6.3378|6.3378|467056|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963|350294|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295|314365|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-05 00:00:00|6.3211|6.3211|6.1958|6.2041|440112|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-08 00:00:00|6.2041|6.2041|6.0373|6.087 |655676|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-09 00:00:00|6.1208|6.2376|6.1208|6.1621|592806|0      |file:///dataset/stocks-small/ibm.us.txt|
|1962-01-10 00:00:00|6.1707|6.2041|6.1707|6.1707|359274

In [11]:
df_lookup = spark.read.csv('/dataset/yahoo-symbols-201709.csv')

In [12]:
df_lookup.show()

+------+--------------------+--------+--------------------+-------+
|   _c0|                 _c1|     _c2|                 _c3|    _c4|
+------+--------------------+--------+--------------------+-------+
|Ticker|                Name|Exchange|       Category Name|Country|
|  OEDV|Osage Exploration...|     PNK|                null|    USA|
|  AAPL|          Apple Inc.|     NMS|Electronic Equipment|    USA|
|   BAC|Bank of America C...|     NYQ|  Money Center Banks|    USA|
|  AMZN|    Amazon.com, Inc.|     NMS|Catalog & Mail Or...|    USA|
|     T|           AT&T Inc.|     NYQ|Telecom Services ...|    USA|
|  GOOG|       Alphabet Inc.|     NMS|Internet Informat...|    USA|
|    MO|  Altria Group, Inc.|     NYQ|          Cigarettes|    USA|
|   DAL|Delta Air Lines, ...|     NYQ|      Major Airlines|    USA|
|    AA|   Alcoa Corporation|     NYQ|            Aluminum|    USA|
|   AXP|American Express ...|     NYQ|     Credit Services|    USA|
|    DD|E. I. du Pont de ...|     NYQ|Agricultur

In [13]:
def extract_symbol_from(filename):
    return filename.split('/')[-1].split('.')[0].upper()

In [14]:
# filename = 'file:///dataset/stocks-small/ibm.us.txt' # => IBM
extract_symbol_from('file:///dataset/stocks-small/ibm.us.txt')

'IBM'

In [15]:
extract_symbol = F.udf(lambda filename: extract_symbol_from(filename), StringType())

In [16]:
stocks_folder = stocks_dir
df = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .csv(stocks_folder) \
        .withColumn("name", extract_symbol(F.input_file_name()))

In [17]:
df.show(5)

+-------------------+------+------+------+------+------+-------+----+
|               Date|  Open|  High|   Low| Close|Volume|OpenInt|name|
+-------------------+------+------+------+------+------+-------+----+
|1962-01-02 00:00:00| 6.413| 6.413|6.3378|6.3378|467056|      0| IBM|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963|350294|      0| IBM|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295|314365|      0| IBM|
|1962-01-05 00:00:00|6.3211|6.3211|6.1958|6.2041|440112|      0| IBM|
|1962-01-08 00:00:00|6.2041|6.2041|6.0373| 6.087|655676|      0| IBM|
+-------------------+------+------+------+------+------+-------+----+
only showing top 5 rows



In [18]:
df = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .csv(stocks_folder) \
        .withColumn("name", extract_symbol(F.input_file_name())) \
        .withColumnRenamed("Date", "dateTime") \
        .withColumnRenamed("Open", "open") \
        .withColumnRenamed("High", "high") \
        .withColumnRenamed("Low", "low") \
        .withColumnRenamed("Close", "close") \
        .drop("Volume", "OpenInt")

In [19]:
df_stocks = df

In [20]:
df_stocks.show(5)

+-------------------+------+------+------+------+----+
|           dateTime|  open|  high|   low| close|name|
+-------------------+------+------+------+------+----+
|1962-01-02 00:00:00| 6.413| 6.413|6.3378|6.3378| IBM|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963| IBM|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295| IBM|
|1962-01-05 00:00:00|6.3211|6.3211|6.1958|6.2041| IBM|
|1962-01-08 00:00:00|6.2041|6.2041|6.0373| 6.087| IBM|
+-------------------+------+------+------+------+----+
only showing top 5 rows



In [21]:
lookup_file = '/dataset/yahoo-symbols-201709.csv'

In [22]:
symbols_lookup = spark.read. \
        option("header", True). \
        option("inferSchema", True). \
        csv(lookup_file). \
        select("Ticker", "Category Name"). \
        withColumnRenamed("Ticker", "symbol"). \
        withColumnRenamed("Category Name", "category")

In [23]:
df_stocks.show(3)
symbols_lookup.show(3)

+-------------------+------+------+------+------+----+
|           dateTime|  open|  high|   low| close|name|
+-------------------+------+------+------+------+----+
|1962-01-02 00:00:00| 6.413| 6.413|6.3378|6.3378| IBM|
|1962-01-03 00:00:00|6.3378|6.3963|6.3378|6.3963| IBM|
|1962-01-04 00:00:00|6.3963|6.3963|6.3295|6.3295| IBM|
+-------------------+------+------+------+------+----+
only showing top 3 rows

+------+--------------------+
|symbol|            category|
+------+--------------------+
|  OEDV|                null|
|  AAPL|Electronic Equipment|
|   BAC|  Money Center Banks|
+------+--------------------+
only showing top 3 rows



In [24]:
joined_df = df_stocks \
    .withColumnRenamed('dateTime', "full_date") \
    .filter("full_date >= \"2017-09-01\"") \
    .withColumn("year", F.year("full_date")) \
    .withColumn("month", F.month("full_date")) \
    .withColumn("day", F.dayofmonth("full_date")) \
    .withColumnRenamed("name", "symbol") \
    .join(symbols_lookup, ["symbol"])

In [25]:
joined_df.show(3)

+------+-------------------+------+------+------+------+----+-----+---+--------------------+
|symbol|          full_date|  open|  high|   low| close|year|month|day|            category|
+------+-------------------+------+------+------+------+----+-----+---+--------------------+
|   IBM|2017-09-01 00:00:00|141.57|143.07|141.57|142.65|2017|    9|  1|Information Techn...|
|   IBM|2017-09-05 00:00:00|142.08|142.93|141.29|141.62|2017|    9|  5|Information Techn...|
|   IBM|2017-09-06 00:00:00|142.46|143.04|142.08| 142.4|2017|    9|  6|Information Techn...|
+------+-------------------+------+------+------+------+----+-----+---+--------------------+
only showing top 3 rows



In [26]:
window20 = (Window.partitionBy(F.col('symbol')).orderBy(F.col("full_date")).rowsBetween(-20, 0))
window50 = (Window.partitionBy(F.col('symbol')).orderBy(F.col("full_date")).rowsBetween(-50, 0))
window100 = (Window.partitionBy(F.col('symbol')).orderBy(F.col("full_date")).rowsBetween(-100, 0))

In [27]:
stocks_moving_avg_df = joined_df \
    .withColumn("ma20", F.avg("close").over(window20)) \
    .withColumn("ma50", F.avg("close").over(window50)) \
    .withColumn("ma100", F.avg("close").over(window100))

In [28]:
# Moving Average
stocks_moving_avg_df.select('symbol', 'close', 'ma20').show(25)

+------+------+------------------+
|symbol| close|              ma20|
+------+------+------------------+
|  AAPL|163.46|            163.46|
|  AAPL| 161.5|162.48000000000002|
|  AAPL|161.33| 162.0966666666667|
|  AAPL|160.68|          161.7425|
|  AAPL|158.06|           161.006|
|  AAPL|160.92|160.99166666666665|
|  AAPL|160.28|160.89000000000001|
|  AAPL|159.08|         160.66375|
|  AAPL|157.71|160.33555555555554|
|  AAPL|159.31|           160.233|
|  AAPL| 158.1| 160.0390909090909|
|  AAPL|158.16|          159.8825|
|  AAPL|155.51|159.54615384615383|
|  AAPL|152.84|159.06714285714287|
|  AAPL|151.35|158.55266666666665|
|  AAPL|150.01|         158.01875|
|  AAPL|152.59| 157.6994117647059|
|  AAPL|153.68|157.47611111111112|
|  AAPL|152.73| 157.2263157894737|
|  AAPL|153.57|157.04350000000002|
|  AAPL|153.26|156.86333333333334|
|  AAPL|153.92| 156.4090476190476|
|  AAPL|152.93|156.00095238095238|
|  AAPL|154.83|155.69142857142856|
|  AAPL|154.74|155.40857142857143|
+------+------+-----

In [29]:
output_dir = '/dataset/output.parquet'

In [30]:
stocks_moving_avg_df \
    .write \
    .mode('overwrite') \
    .partitionBy("year", "month", "day") \
    .parquet(output_dir)

In [31]:
df_parquet = spark.read.parquet(output_dir)

In [32]:
df_parquet.count()

950

In [33]:
df_parquet.createOrReplaceTempView("stocks")

In [34]:
badHighestClosingPrice = spark.sql("SELECT symbol, MAX(close) AS price FROM stocks WHERE full_date >= '2017-09-01' AND full_date < '2017-10-01' GROUP BY symbol")
badHighestClosingPrice.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[symbol#559], functions=[max(close#564)])
+- Exchange hashpartitioning(symbol#559, 2)
   +- *(1) HashAggregate(keys=[symbol#559], functions=[partial_max(close#564)])
      +- *(1) Project [symbol#559, close#564]
         +- *(1) Filter ((isnotnull(full_date#560) && (cast(full_date#560 as string) >= 2017-09-01)) && (cast(full_date#560 as string) < 2017-10-01))
            +- *(1) FileScan parquet [symbol#559,full_date#560,close#564,year#569,month#570,day#571] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/dataset/output.parquet], PartitionCount: 50, PartitionFilters: [], PushedFilters: [IsNotNull(full_date)], ReadSchema: struct<symbol:string,full_date:timestamp,close:double>


In [35]:
highestClosingPrice = spark.sql("SELECT symbol, MAX(close) AS price FROM stocks WHERE year=2017 AND month=9 GROUP BY symbol")
highestClosingPrice.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[symbol#559], functions=[max(close#564)])
+- Exchange hashpartitioning(symbol#559, 2)
   +- *(1) HashAggregate(keys=[symbol#559], functions=[partial_max(close#564)])
      +- *(1) Project [symbol#559, close#564]
         +- *(1) FileScan parquet [symbol#559,close#564,year#569,month#570,day#571] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/dataset/output.parquet], PartitionCount: 20, PartitionFilters: [isnotnull(year#569), isnotnull(month#570), (year#569 = 2017), (month#570 = 9)], PushedFilters: [], ReadSchema: struct<symbol:string,close:double>


In [36]:
# Write to Postgres
stocks_moving_avg_df \
    .drop("year", "month", "day") \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/workshop") \
    .option("dbtable", "workshop.stocks") \
    .option("user", "workshop") \
    .option("password", "w0rkzh0p") \
    .option("driver", "org.postgresql.Driver") \
    .mode('append') \
    .save()

Py4JJavaError: An error occurred while calling o219.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 34.0 failed 4 times, most recent failure: Lost task 0.3 in stage 34.0 (TID 160, 172.19.0.12, executor 1): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO workshop.stocks ("symbol","full_date","open","high","low","close","category","ma20","ma50","ma100") VALUES ('AAPL','2017-09-01 00:00:00+00'::timestamp with time zone,164.21,164.35,163.04,163.46,'Electronic Equipment',163.46,163.46,163.46) was aborted: ERROR: duplicate key value violates unique constraint "stocks_pkey"
  Detail: Key (full_date, symbol)=(2017-09-01 00:00:00+00, AAPL) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2191)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1325)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1350)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:458)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:791)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1563)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "stocks_pkey"
  Detail: Key (full_date, symbol)=(2017-09-01 00:00:00+00, AAPL) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2477)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2190)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO workshop.stocks ("symbol","full_date","open","high","low","close","category","ma20","ma50","ma100") VALUES ('AAPL','2017-09-01 00:00:00+00'::timestamp with time zone,164.21,164.35,163.04,163.46,'Electronic Equipment',163.46,163.46,163.46) was aborted: ERROR: duplicate key value violates unique constraint "stocks_pkey"
  Detail: Key (full_date, symbol)=(2017-09-01 00:00:00+00, AAPL) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2191)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1325)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1350)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:458)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:791)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1563)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "stocks_pkey"
  Detail: Key (full_date, symbol)=(2017-09-01 00:00:00+00, AAPL) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2477)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2190)
	... 20 more


In [37]:
consult = spark.sql("SELECT * FROM stocks")

In [38]:
consult.show()

+------+-------------------+-------+-------+-------+------+--------------------+------------------+------------------+------------------+----+-----+---+
|symbol|          full_date|   open|   high|    low| close|            category|              ma20|              ma50|             ma100|year|month|day|
+------+-------------------+-------+-------+-------+------+--------------------+------------------+------------------+------------------+----+-----+---+
|  AAPL|2017-09-08 00:00:00| 160.28| 160.57| 157.96|158.06|Electronic Equipment|           161.006|           161.006|           161.006|2017|    9|  8|
|  BABA|2017-09-08 00:00:00| 169.99| 171.22| 168.92| 169.0|Specialty Retail,...|169.71599999999998|169.71599999999998|169.71599999999998|2017|    9|  8|
|  EBAY|2017-09-08 00:00:00|  38.04|   38.3| 37.725| 37.79|Specialty Retail,...| 36.99999999999999| 36.99999999999999| 36.99999999999999|2017|    9|  8|
|    FB|2017-09-08 00:00:00| 173.09| 173.49|  170.8|170.95|Internet Informat...|  

In [None]:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/workshop") \
    .option("dbtable", "workshop.stocks") \
    .option("user", "workshop") \
    .option("password", "w0rkzh0p") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [39]:
spark.stop()