In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, udf
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import time

In [37]:
spark = SparkSession.builder \
    .appName("Assignment") \
    .getOrCreate()

In [38]:
schema = StructType([
    StructField("serial_no", LongType(), True),
    StructField("ids", LongType(), True),
    StructField("date", StringType(), True),
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)
])

In [39]:
dataset = "ProjectTweets.csv"
df = spark.read.csv(dataset, schema=schema, header=False)

In [40]:
df.show()

+---------+----------+--------------------+--------+---------------+--------------------+
|serial_no|       ids|                date|    flag|           user|                text|
+---------+----------+--------------------+--------+---------------+--------------------+
|        0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|        1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|        2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|        3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|        4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|        5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|        6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|        7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|        8

In [41]:
df = df.drop("serial_no")

In [42]:
df.printSchema()

root
 |-- ids: long (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [43]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


In [44]:
df = df.withColumn("date", to_timestamp(col("date"), "EEE MMM dd HH:mm:ss"))

In [45]:
df.show()

+----------+-------------------+--------+---------------+--------------------+
|       ids|               date|    flag|           user|                text|
+----------+-------------------+--------+---------------+--------------------+
|1467810369|1970-04-06 22:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|1467810672|1970-04-06 22:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|1467810917|1970-04-06 22:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|1467811184|1970-04-06 22:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|1467811193|1970-04-06 22:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
|1467811372|1970-04-06 22:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|1467811592|1970-04-06 22:20:03|NO_QUERY|        mybirch|         Need a hug |
|1467811594|1970-04-06 22:20:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|1467811795|1970-04-06 22:20:05|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|1467812025|1970-04-06 22:20:09|NO_QUERY|        mim

In [46]:
df = df.drop("ids", "flag", "user")

In [47]:
df.show()

+-------------------+--------------------+
|               date|                text|
+-------------------+--------------------+
|1970-04-06 22:19:45|@switchfoot http:...|
|1970-04-06 22:19:49|is upset that he ...|
|1970-04-06 22:19:53|@Kenichan I dived...|
|1970-04-06 22:19:57|my whole body fee...|
|1970-04-06 22:19:57|@nationwideclass ...|
|1970-04-06 22:20:00|@Kwesidei not the...|
|1970-04-06 22:20:03|         Need a hug |
|1970-04-06 22:20:03|@LOLTrish hey  lo...|
|1970-04-06 22:20:05|@Tatiana_K nope t...|
|1970-04-06 22:20:09|@twittera que me ...|
|1970-04-06 22:20:16|spring break in p...|
|1970-04-06 22:20:17|I just re-pierced...|
|1970-04-06 22:20:19|@caregiving I cou...|
|1970-04-06 22:20:19|@octolinz16 It it...|
|1970-04-06 22:20:20|@smarrison i woul...|
|1970-04-06 22:20:20|@iamjazzyfizzle I...|
|1970-04-06 22:20:22|Hollis' death sce...|
|1970-04-06 22:20:25|about to file taxes |
|1970-04-06 22:20:31|@LettyA ahh ive a...|
|1970-04-06 22:20:34|@FakerPattyPattz ...|
+----------

In [48]:
df.head(5)

[Row(date=datetime.datetime(1970, 4, 6, 22, 19, 45), text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D"),
 Row(date=datetime.datetime(1970, 4, 6, 22, 19, 49), text="is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!"),
 Row(date=datetime.datetime(1970, 4, 6, 22, 19, 53), text='@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds'),
 Row(date=datetime.datetime(1970, 4, 6, 22, 19, 57), text='my whole body feels itchy and like its on fire '),
 Row(date=datetime.datetime(1970, 4, 6, 22, 19, 57), text="@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there. ")]

In [52]:
# Define functions for storing data into SQL and NoSQL databases
def store_data_in_sql(data):
    start_time = time.time()
    df.write.format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/database_name") \
        .option("dbtable", "table_name") \
        .option("user", "username") \
        .option("password", "password") \
        .save()
    end_time = time.time()
    return end_time - start_time


In [53]:
def store_data_in_nosql(data):
    start_time = time.time()
    df.write.format("mongo") \
        .option("uri", "mongodb://localhost:27017/database.collection") \
        .mode("overwrite") \
        .save()
    end_time = time.time()
    return end_time - start_time

In [54]:
# Store data into SQL and NoSQL databases and measure time taken
sql_insertion_time = store_data_in_sql(df)
nosql_insertion_time = store_data_in_nosql(df)

Py4JJavaError: An error occurred while calling o138.save.
: java.sql.SQLException: No suitable driver
	at java.sql.DriverManager.getDriver(Unknown Source)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:109)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:109)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:254)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:258)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
