In [6]:
import findspark
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover 

In [None]:
findspark.init()
spark = SparkSession.builder.getOrCreate()

In [3]:
df = (spark.read 
    .format("jdbc") 
    .option("url", "jdbc:postgresql://34.133.240.67/dbname") 
    .option("dbtable", "public.movie_review_bronze") 
    .option("user", "dbuser")
    .option("password", "dbpassword") 
    .option("driver", "org.postgresql.Driver")
    .load())

In [4]:
#df.printSchema()

root
 |-- cid: integer (nullable = true)
 |-- review_str: string (nullable = true)



In [11]:

tokenizer = Tokenizer(outputCol="words")
tokenizer.setInputCol("review_str")

remover = StopWordsRemover()
remover.setInputCol("words")
remover.setOutputCol("clean_words")


df = tokenizer.transform(df)
df = remover.transform(df)

In [12]:
#df.printSchema()

root
 |-- cid: integer (nullable = true)
 |-- review_str: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- clean_words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [16]:
df = df.withColumn("positive_review", array_contains(df.clean_words, "good").cast('integer'))

In [14]:
#df.printSchema()

root
 |-- cid: integer (nullable = true)
 |-- review_str: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- clean_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- positive_review: boolean (nullable = true)



In [18]:
#df.select("positive_review").head(20)

[Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=1),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=1),
 Row(positive_review=1),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=1),
 Row(positive_review=1),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=0),
 Row(positive_review=1)]

In [20]:
write_df = df.select("cid", "positive_review")

In [21]:
#write_df.printSchema()

root
 |-- cid: integer (nullable = true)
 |-- positive_review: integer (nullable = true)



In [27]:
table_name = "reviews"
(write_df.write 
    .format("jdbc") 
    .option("url", "jdbc:postgresql://34.133.240.67/dbname") 
    .option("dbtable", f"silver.{table_name}") 
    .option("user", "dbuser")
    .option("password", "dbpassword") 
    .option("driver", "org.postgresql.Driver")
    .mode("overwrite")
    .option("truncate", True)
    .save())

In [28]:
print("saved table to postgres")

saved table to postgres
