## From CSV to Mysql to Cassandra

in this Notebook I'll be using Pyspark to: 

1 - read from the tweets csv file into a pyspark dataframe

2 - saving the pyspark dataframe into a Mysql table (raw data before map reduce)

3 - using pyspark to read from mysql table

4 - apply reduce and data transformation to the dataframe

5 - apply data cleaning,data engineering and sentiment analysis 

6 - using pyspark for Saving resulting dataframe (post map-reduce) into Cassandra

7 - reading from cassandra and create a csv as output for continuing with time serie analysis on another notebook


#### connectors for Mysql and Cassandra
pyspark --jars mysql-connector-j-8.1.0.jar --packages com.datastax.spark:spark-cassandra-connector_2.12:3.1.0

In [127]:
#create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
  .appName("MyApp") \
  .config("spark.jars",  "mysql-connector-j-8.1.0.jar") \
  .master("local")\
  .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
warnings.filterwarnings("ignore")

Data can be uploaded normally from csv to mysql and then read it with pyspark

in this case I'm uploading csv to mysql using pyspark and then reading it from mysql table to keep all in one notebook

In [128]:
#pyspark read from csv
data = spark.read.csv("/user1/ProjectTweets.csv", inferSchema=True)
data.show()



+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

                                                                                

In [129]:
#pyspark write into Mysql table
# sql database is called Tweets and table is called Tweets, schema is already present in mysql (done through CLI)
data.write \
  .format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/Tweets") \
  .option("dbtable", "Tweets") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

                                                                                

In [130]:
#pyspark read from Mysql table we just inserted 
df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/Tweets") \
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "Tweets") \
    .option("user", "root").option("password", "password").load()

df.show()

[Stage 982:>                                                        (0 + 1) / 1]

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

                                                                                

#### Data engineering using pyspark

In [131]:
if df.count() > 1000000:
    print(f"{data.count()} ... That's a lot of Data!!")



1600000 ... That's a lot of Data!!


                                                                                

In [132]:
#check unique values for _c3
df.select('_c3').distinct().collect()
#field _c3 only has 1 value, dropping field

                                                                                

[Row(_c3='NO_QUERY')]

In [133]:
df = df.drop(df._c3)

In [134]:
#checking for duplicates
df.groupby("_c1").count().where("count > 1").show()

[Stage 989:>                                                        (0 + 1) / 1]

+----------+-----+
|       _c1|count|
+----------+-----+
|1468544973|    2|
|1690908358|    2|
|1834777946|    2|
|1882160717|    2|
|1965601765|    2|
|1982434182|    2|
|2002309001|    2|
|2190980212|    2|
|1685304801|    2|
|1686371908|    2|
|1957194329|    2|
|1969964899|    2|
|1974268607|    2|
|2056807406|    2|
|2063670799|    2|
|1556266702|    2|
|1752414405|    2|
|1824843992|    2|
|1881996107|    2|
|1983726537|    2|
+----------+-----+
only showing top 20 rows



                                                                                

In [135]:
duplicates = df.groupby("_c1").count().where("count > 1").drop("count")
print(f"Number of duplicates: {duplicates.count()}")



Number of duplicates: 1685


                                                                                

In [136]:
#show 1 duplicate example
df[df["_c1"] == 1983726537].show(truncate=False)

[Stage 996:>                                                        (0 + 1) / 1]

+-------+----------+----------------------------+-------+---------------------------------------------------------------------------------------------+
|_c0    |_c1       |_c2                         |_c4    |_c5                                                                                          |
+-------+----------+----------------------------+-------+---------------------------------------------------------------------------------------------+
|252393 |1983726537|Sun May 31 13:42:57 PDT 2009|iargent|Should have gone on a bike ride today but never quite happened  Still enjoyed the sun though |
|1190503|1983726537|Sun May 31 13:42:57 PDT 2009|iargent|Should have gone on a bike ride today but never quite happened  Still enjoyed the sun though |
+-------+----------+----------------------------+-------+---------------------------------------------------------------------------------------------+



                                                                                

In [137]:
df = df.dropDuplicates(['_c1'])
df.count() #checking how many values after dropping duplicates

                                                                                

1598315

In [138]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



Dealing with timestamps

In [139]:
#example: 
df.first()["_c2"] #PDT stands for Pacific time zone

                                                                                

'Mon Apr 06 22:32:38 PDT 2009'

In [140]:
#let's check if all time stamps are in PDT
#if all strings have PDT in the timestamp this list should return empty
[x for x in df.rdd.toLocalIterator() if "PDT" not in x['_c2']]

[Stage 1002:>                                                       (0 + 1) / 1]

[]

In [141]:
#all timestamps are PDT
from pyspark.sql.functions import to_timestamp
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY") #Had to set as Legacy cause of error 
#spark.conf.set("spark.sql.legacy.timeParserPolicy","CORRECTED") #for return to standard timeparser policy

Time_Format = "E MMM d HH:mm:ss z yyyy"
df = df.withColumn("Timestamp", to_timestamp(df["_c2"], Time_Format))
df = df.drop(df._c2)
df = df.drop(df._c0)
df.show(5)

[Stage 1402:>                                                       (0 + 1) / 1]

+----------+-------------+--------------------+-------------------+
|       _c1|          _c4|                 _c5|          Timestamp|
+----------+-------------+--------------------+-------------------+
|1467860144|     Jana1976|@JonathanRKnight ...|2009-04-07 06:32:38|
|1467862225|        hdm42|@vjl also, your w...|2009-04-07 06:33:11|
|1467889791|jennhelvering|Just called Hills...|2009-04-07 06:40:33|
|1467898027|   twitrbug81|@JonathanRKnight ...|2009-04-07 06:42:49|
|1467904302|bsbnumber1fan|@nick_carter Aww ...|2009-04-07 06:44:34|
+----------+-------------+--------------------+-------------------+
only showing top 5 rows



[Stage 1403:>                                                       (0 + 1) / 1]                                                                                

In [142]:
#remove hashtags
from pyspark.sql import functions as f
df = df.withColumn("Text",f.regexp_replace("_c5","#([^\s]+)\s",""))

In [143]:
#remove mentions
df = df.withColumn("Text",f.regexp_replace("Text","@([^\s]+)\s",""))

In [144]:
#drop useless
df = df.drop(df._c5)
df = df.drop(df._c4)

In [145]:
#renaming
df = df.withColumnRenamed("_c1","id")
df = df.withColumnRenamed("Timestamp","timestamp")
df = df.withColumnRenamed("Text","text")

In [146]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- text: string (nullable = true)



In [147]:
#Sentiment extraction
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from nltk.sentiment import SentimentIntensityAnalyzer
import nltk
nltk.download('vader_lexicon')
sia = SentimentIntensityAnalyzer()

def VAder(x):
    return sia.polarity_scores(x)["compound"]

# Register the VADER function as a UDF (User-Defined Function)
vader_udf = udf(VAder, DoubleType())

# Add the 'sentiment' column to the DataFrame using the UDF
df = df.withColumn("sentiment", vader_udf(df["text"]))

df.printSchema()

root
 |-- id: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- sentiment: double (nullable = true)



[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/hduser/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [148]:
from pyspark.sql.functions import to_date, mean
from pyspark.sql import SparkSession

# Group by day and calculate the mean score
result_df = df.groupBy(to_date("timestamp").alias("Day")).agg(mean("sentiment").alias("score"))
result_df = result_df.sort("Day")

In [149]:
result_df.createOrReplaceTempView("aggregate")
res = spark.sql("SELECT min(Day) from aggregate")
res = spark.sql("SELECT max(Day) from aggregate")
res.show()



+----------+
|  max(Day)|
+----------+
|2009-06-25|
+----------+





In [150]:
#create dataframe with all dates within min and max
import pandas as pd

#creating a new Df with same start and end dates
dates = []
for date in pd.date_range(start="2009-04-07",end="2009-06-25"):
    dates.append(date.strftime("%Y-%m-%d"))
    
date_df = spark.createDataFrame(dates,"string").toDF("Day")

In [151]:
result_df.printSchema()

root
 |-- Day: date (nullable = true)
 |-- score: double (nullable = true)



In [152]:
date_df.printSchema()

root
 |-- Day: string (nullable = true)



In [153]:
#left join to have one dataset with nulls also
output = date_df.join(result_df,on='Day',how='left').sort("Day")

In [154]:
output.show(5)



+----------+-------------------+
|       Day|              score|
+----------+-------------------+
|2009-04-07|0.15867049362980187|
|2009-04-08|               null|
|2009-04-09|               null|
|2009-04-10|               null|
|2009-04-11|               null|
+----------+-------------------+
only showing top 5 rows



                                                                                

In [155]:
#matching name columns in cassandra
output = output.withColumnRenamed("Day","day")

In [156]:
#session for Cassandra
spark = SparkSession.builder\
  .appName("MyApp") \
  .master("local[*]")\
  .getOrCreate()

#write into Cassandra
#Cassandra keyspace and table is already created (done through CLI)
output.write\
  .format('org.apache.spark.sql.cassandra')\
  .mode('append')\
  .options(table='tweets_final',keyspace='tweets_final')\
  .save()

                                                                                

In [157]:
#read from cassandra
last = spark.read\
  .format('org.apache.spark.sql.cassandra')\
  .options(table='tweets_final',keyspace='tweets_final')\
  .load()
last = last.sort("Day")
last.show(2)

+-------------------+--------------------+
|                day|               score|
+-------------------+--------------------+
|2009-04-07 00:00:00|0.158670493629801870|
|2009-04-08 00:00:00|                null|
+-------------------+--------------------+
only showing top 2 rows



In [158]:
last.tail(5)

[Row(day=datetime.datetime(2009, 6, 21, 0, 0), score=Decimal('-0.030768202243756308')),
 Row(day=datetime.datetime(2009, 6, 22, 0, 0), score=Decimal('-0.052183901689708160')),
 Row(day=datetime.datetime(2009, 6, 23, 0, 0), score=Decimal('-0.058783732858222630')),
 Row(day=datetime.datetime(2009, 6, 24, 0, 0), score=Decimal('-0.058545564212113634')),
 Row(day=datetime.datetime(2009, 6, 25, 0, 0), score=Decimal('-0.051633178163351674'))]

In [170]:
#result csv file is then moved from virtual machine to my personal machine for modelling part
last.write.csv("Downloads/final_output.csv",header=True)

                                                                                