# Big Data Mysql and Pyspark

## Start session Pyspark

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import warnings 
warnings.filterwarnings("ignore")

In [2]:
#start spark session
spark = SparkSession.builder \
    .appName("Pyspark to MySQL") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()


24/05/19 14:25:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Loading the data from Hadoop using Pyspark

In [6]:
# Define schema for the DataFrame
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType

schema = StructType([
    StructField("ID", LongType(), True),
    StructField("unknown", StringType(), True),
    StructField("date", TimestampType(), True),
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)
])

# Read the data with the correct timestampFormat fron hadoop
tweets_df = spark.read.csv("hdfs:///CA2/ProjectTweets.csv", header=False, schema=schema, sep=",", timestampFormat="EEE MMM dd HH:mm:ss zzz yyyy")

# Show the data
tweets_df.show()


+---+----------+-------------------+--------+---------------+--------------------+
| ID|   unknown|               date|    flag|           user|                text|
+---+----------+-------------------+--------+---------------+--------------------+
|  0|1467810369|2009-04-07 06:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|2009-04-07 06:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|2009-04-07 06:20:03|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|2009-04-07 06:20:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|2009-04-07 06:20:05|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|  9

In [7]:
#Check Schema to check data types and structure of the data
tweets_df.printSchema()


root
 |-- ID: long (nullable = true)
 |-- unknown: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



# Read from MySQ

In [8]:
#Let's check if we can read the mysql database


# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Read from MySQL") \
    .getOrCreate()

# Define database connection properties
url = "jdbc:mysql://localhost:3306/Tweets"
properties = {
    "driver": "com.mysql.cj.jdbc.Driver",
    "user": "root",
    "password": "password"
}

# Define the table you want to read
table = "tweet_data"

# Read data from the SQL database into a DataFrame
tweets_sql_df = spark.read.jdbc(url=url, table=table, properties=properties)

# Show the DataFrame
tweets_sql_df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+---+----------+-------------------+--------+---------------+--------------------+
| ID|   unknown|               date|    flag|           user|                text|
+---+----------+-------------------+--------+---------------+--------------------+
|  0|1467810369|2009-04-07 06:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|2009-04-07 06:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|2009-04-07 06:20:03|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|2009-04-07 06:20:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|2009-04-07 06:20:05|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|  9

                                                                                

In [9]:
#Check is data is written in Mysql

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Write to MySQL") \
    .getOrCreate()

# Replace the connection parameters with your MySQL connection details
username = 'root'
password = 'password'
host = 'localhost'
database_name = 'Tweets'
table_name = 'tweet_data'

# Convert Spark DataFrame to temporary view
tweets_df.createOrReplaceTempView("tweets")

# Define JDBC URL
jdbc_url = f"jdbc:mysql://{host}/{database_name}"

# Define properties for the JDBC connection
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Write DataFrame to MySQL table
tweets_df.write.jdbc(url=jdbc_url, table=table_name, mode="overwrite", properties=connection_properties)

print("Data successfully written to MySQL")


24/05/19 14:26:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.




Data successfully written to MySQL


                                                                                

In [10]:
#Let's check if the data is filled in the Mysql
tweets_sql_df = spark.read.jdbc(url=url, table=table, properties=properties)

# Show the DataFrame
tweets_sql_df.show()

[Stage 5:>                                                          (0 + 1) / 1]

+---+----------+-------------------+--------+---------------+--------------------+
| ID|   unknown|               date|    flag|           user|                text|
+---+----------+-------------------+--------+---------------+--------------------+
|  0|1467810369|2009-04-07 06:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|2009-04-07 06:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|2009-04-07 06:20:03|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|2009-04-07 06:20:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|2009-04-07 06:20:05|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|  9

                                                                                

## Exploratory Data Analysis 

In [11]:
#Check Statistic of dataset
tweets_sql_df.describe().show()

[Stage 6:>                                                          (0 + 1) / 1]

+-------+-----------------+--------------------+--------+--------------------+--------------------+
|summary|               ID|             unknown|    flag|                user|                text|
+-------+-----------------+--------------------+--------+--------------------+--------------------+
|  count|          1600000|             1600000| 1600000|             1600000|             1600000|
|   mean|         799999.5|1.9988175522956276E9|    null| 4.325887521835714E9|                null|
| stddev|461880.3596892604|1.9357607362268892E8|    null|5.162733218454885E10|                null|
|    min|                0|          1467810369|NO_QUERY|        000catnap000|                 ...|
|    max|          1599999|          2329205794|NO_QUERY|          zzzzeus111|ï¿½ï¿½ï¿½ï¿½ï¿½ß§...|
+-------+-----------------+--------------------+--------+--------------------+--------------------+



                                                                                

In [14]:
from pyspark.sql.functions import col, min, max
from pyspark.sql.types import DateType
from datetime import timedelta


# Convert 'date' column to DateType 
tweets_sql_df = tweets_sql_df.withColumn("date", col("date").cast(DateType()))

# Find min and max dates
min_date = tweets_sql_df.selectExpr("min(date)").first()[0]
max_date = tweets_sql_df.selectExpr("max(date)").first()[0]

# Generate DataFrame with all dates within the range
date_range = [min_date + timedelta(days=x) for x in range((max_date - min_date).days + 1)]
date_tweets = spark.createDataFrame([(date,) for date in date_range], ["date"]).withColumn("date", col("date").cast(DateType()))

# Left join with the original DataFrame to find missing dates
missing_dates_df = date_tweets.join(tweets_sql_df, "date", "left_anti")

missing_dates_count = missing_dates_df.count()

if missing_dates_count > 0:
    print("Number of missing dates:", missing_dates_count)
    print("Actual missing dates:")
    missing_dates_df.show()
else:
    print("No missing dates found.")

                                                                                

Number of missing dates: 39
Actual missing dates:


[Stage 25:>                                                         (0 + 1) / 1]

+----------+
|      date|
+----------+
|2009-04-08|
|2009-04-09|
|2009-04-10|
|2009-04-11|
|2009-04-12|
|2009-04-13|
|2009-04-14|
|2009-04-15|
|2009-04-16|
|2009-04-17|
|2009-04-22|
|2009-04-23|
|2009-04-24|
|2009-04-25|
|2009-04-26|
|2009-04-27|
|2009-04-28|
|2009-04-29|
|2009-04-30|
|2009-05-01|
+----------+
only showing top 20 rows



                                                                                

In [15]:
#Check for duplicates based on user, date, and text
duplicates = tweets_sql_df.groupBy("user", "date", "text").count().where(col("count") > 1)

if duplicates.count() > 0:
    print("Duplicate rows based on user, date, and text:")
    duplicates.show()
else:
    print("No duplicates found.")

                                                                                

Duplicate rows based on user, date, and text:


[Stage 35:>                                                         (0 + 1) / 1]

+-------------+----------+--------------------+-----+
|         user|      date|                text|count|
+-------------+----------+--------------------+-----+
|       LouluS|2009-04-19|reading the islan...|    2|
|     tweetpet|2009-04-20|@astrid35  Clean Me!|    2|
|     eBlondie|2009-05-04|it's raining here...|    2|
|  elenaaaaaaa|2009-05-10|i'm dreading tomo...|    2|
|       Tayzor|2009-05-11|Heading back to L...|    2|
|  guiltfeeder|2009-05-17|@Keinessish Yeah ...|    2|
|XxSuperHansxX|2009-05-22|It's so warm.... ...|    2|
|  cbruton1975|2009-06-01|Found out last ni...|    2|
|Jeff_Hardyfan|2009-05-29|&quot;@mileycyrus...|    2|
|   HeyMoonday|2009-06-06|You tried your be...|    2|
|    belgarcia|2009-06-22|It's one of the l...|    2|
|    MsStaceyK|2009-06-16|   @muSicFienDkiCks |    2|
|     tweetpet|2009-04-21|@MidasJackson  Cl...|    2|
| raeraeverret|2009-05-03|Shreveport this w...|    2|
|      _Unica_|2009-05-04|Sweeeet Sunday..n...|    2|
|     diiilxia|2009-05-04|@D

                                                                                

In [16]:
# Check the distribution of tweet timestamps
tweets_sql_df.groupBy("date").count().orderBy("date").show()

[Stage 38:>                                                         (0 + 1) / 1]

+----------+------+
|      date| count|
+----------+------+
|2009-04-07| 20671|
|2009-04-18| 16132|
|2009-04-19| 33670|
|2009-04-20| 18447|
|2009-04-21| 11105|
|2009-05-02| 31096|
|2009-05-03| 25045|
|2009-05-04| 29823|
|2009-05-10| 31551|
|2009-05-11|  6217|
|2009-05-12|  4186|
|2009-05-14| 21526|
|2009-05-17| 41205|
|2009-05-18| 44564|
|2009-05-22| 41206|
|2009-05-24|   169|
|2009-05-25|   169|
|2009-05-27| 11619|
|2009-05-29| 55874|
|2009-05-30|103990|
+----------+------+
only showing top 20 rows



                                                                                

In [17]:
from pyspark.sql.functions import desc

# Group by date and count the number of tweets for each date
tweets_by_date = tweets_sql_df.groupBy("date").count()

# Order by count in descending order
tweets_by_date_sorted = tweets_by_date.orderBy(desc("count"))

# Show the top 10 days with the most tweets
top_10_days = tweets_by_date_sorted.limit(10)
top_10_days.show()


[Stage 41:>                                                         (0 + 1) / 1]

+----------+------+
|      date| count|
+----------+------+
|2009-06-07|112221|
|2009-05-31|105820|
|2009-06-06|104053|
|2009-05-30|103990|
|2009-06-01| 95479|
|2009-06-16| 90410|
|2009-06-02| 81633|
|2009-06-15| 78395|
|2009-06-03| 61265|
|2009-05-29| 55874|
+----------+------+



                                                                                

In [18]:
# Find top users who tweeted the most
top_users = tweets_sql_df.groupBy("user").count().orderBy("count", ascending=False)

# Show the top 10 users with the most tweets
top_10_users = top_users.limit(10)
top_10_users.show()


[Stage 44:>                                                         (0 + 1) / 1]

+---------------+-----+
|           user|count|
+---------------+-----+
|       lost_dog|  549|
|        webwoke|  345|
|       tweetpet|  310|
|SallytheShizzle|  281|
|    VioletsCRUK|  279|
|    mcraddictal|  276|
|       tsarnick|  248|
|    what_bugs_u|  246|
|    Karen230683|  238|
|      DarkPiano|  236|
+---------------+-----+



                                                                                

In [19]:
from pyspark.sql.functions import split, explode

# Perform frequency analysis in tweet text
word_counts = tweets_sql_df.select(explode(split("text", " ")).alias("word")) \
                      .groupBy("word").count() \
                      .orderBy("count", ascending=False)

# Show the top 10 most frequent words
top_10_words = word_counts.limit(10)
top_10_words.show()


[Stage 48:>                                                         (0 + 1) / 1]

+----+-------+
|word|  count|
+----+-------+
|    |1930617|
|  to| 552962|
|   I| 496608|
| the| 487500|
|   a| 366212|
|  my| 280025|
| and| 275263|
|   i| 249975|
|  is| 217692|
| you| 213871|
+----+-------+



                                                                                

In [20]:
tweets_sql_df.show()

[Stage 52:>                                                         (0 + 1) / 1]

+---+----------+----------+--------+---------------+--------------------+
| ID|   unknown|      date|    flag|           user|                text|
+---+----------+----------+--------+---------------+--------------------+
|  0|1467810369|2009-04-07|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|2009-04-07|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|2009-04-07|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|2009-04-07|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|2009-04-07|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|2009-04-07|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|2009-04-07|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|2009-04-07|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|2009-04-07|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|  9|1467812025|2009-04-07|NO_QUERY|        mimismo|@twittera que me ...|
| 10|1467812416|2009-04-07|NO_QUERY| e

                                                                                

## Dropping unnecesary columns

In [21]:
#Let's delete the unknown column
from pyspark.sql.functions import col

# Drop the 'unknown' and 'flag' columns
tweets_sql_df= tweets_sql_df.drop('unknown', 'flag')

# Show the updated DataFrame
tweets_sql_df.show()

[Stage 53:>                                                         (0 + 1) / 1]

+---+----------+---------------+--------------------+
| ID|      date|           user|                text|
+---+----------+---------------+--------------------+
|  0|2009-04-07|_TheSpecialOne_|@switchfoot http:...|
|  1|2009-04-07|  scotthamilton|is upset that he ...|
|  2|2009-04-07|       mattycus|@Kenichan I dived...|
|  3|2009-04-07|        ElleCTF|my whole body fee...|
|  4|2009-04-07|         Karoli|@nationwideclass ...|
|  5|2009-04-07|       joy_wolf|@Kwesidei not the...|
|  6|2009-04-07|        mybirch|         Need a hug |
|  7|2009-04-07|           coZZ|@LOLTrish hey  lo...|
|  8|2009-04-07|2Hood4Hollywood|@Tatiana_K nope t...|
|  9|2009-04-07|        mimismo|@twittera que me ...|
| 10|2009-04-07| erinx3leannexo|spring break in p...|
| 11|2009-04-07|   pardonlauren|I just re-pierced...|
| 12|2009-04-07|           TLeC|@caregiving I cou...|
| 13|2009-04-07|robrobbierobert|@octolinz16 It it...|
| 14|2009-04-07|    bayofwolves|@smarrison i woul...|
| 15|2009-04-07|     HairByJ

                                                                                

# Overwrite the data in MySQL 

In [23]:
# Overwrite the table in MySQL with the updated DataFrame
tweets_sql_df.write.jdbc(url=jdbc_url, table="tweets_mysql_processed", mode="overwrite", properties=connection_properties)


                                                                                

In [24]:
#Read data from the MySQL table into a Spark DataFrame to Check if data was correctly created in mysql
new_data_df = spark.read.jdbc(url=jdbc_url, table="tweets_mysql_processed", properties=connection_properties)

# Show the DataFrame
new_data_df.show()


[Stage 56:>                                                         (0 + 1) / 1]

+---+----------+---------------+--------------------+
| ID|      date|           user|                text|
+---+----------+---------------+--------------------+
|  0|2009-04-07|_TheSpecialOne_|@switchfoot http:...|
|  1|2009-04-07|  scotthamilton|is upset that he ...|
|  2|2009-04-07|       mattycus|@Kenichan I dived...|
|  3|2009-04-07|        ElleCTF|my whole body fee...|
|  4|2009-04-07|         Karoli|@nationwideclass ...|
|  5|2009-04-07|       joy_wolf|@Kwesidei not the...|
|  6|2009-04-07|        mybirch|         Need a hug |
|  7|2009-04-07|           coZZ|@LOLTrish hey  lo...|
|  8|2009-04-07|2Hood4Hollywood|@Tatiana_K nope t...|
|  9|2009-04-07|        mimismo|@twittera que me ...|
| 10|2009-04-07| erinx3leannexo|spring break in p...|
| 11|2009-04-07|   pardonlauren|I just re-pierced...|
| 12|2009-04-07|           TLeC|@caregiving I cou...|
| 13|2009-04-07|robrobbierobert|@octolinz16 It it...|
| 14|2009-04-07|    bayofwolves|@smarrison i woul...|
| 15|2009-04-07|     HairByJ

                                                                                

In [25]:
# Define the output path for the CSV file
output_path = "/home/hduser/Desktop/adv-data-big-data-ft-ca2-Paolacrir/tweets_mysql_processed.csv"

# Write the DataFrame to a CSV file
new_data_df.write.csv(output_path, header=True, mode="overwrite")

print("DataFrame has been successfully exported to CSV.")


[Stage 57:>                                                         (0 + 1) / 1]

DataFrame has been successfully exported to CSV.


                                                                                

In [26]:
# Convert the Spark DataFrame to a pandas DataFrame
pandas_df = new_data_df.toPandas()

# Define the output path for the CSV file
output_path = "/home/hduser/Desktop/adv-data-big-data-ft-ca2-Paolacrir/tweets_mysql_processed.csv"

# Save the pandas DataFrame to a CSV file
pandas_df.to_csv(output_path, index=False)

print(f"DataFrame has been successfully exported to CSV: {output_path}")


                                                                                

DataFrame has been successfully exported to CSV: /home/hduser/Desktop/adv-data-big-data-ft-ca2-Paolacrir/tweets_mysql_processed.csv


In [28]:
#Finish  Sparksession
spark.stop()