In [1]:
import numpy as np              #For handling arrays
import pandas as pd             # For handling data
import os
import shutil

import matplotlib.pyplot as plt

%matplotlib inline

#from tensorflow.keras.preprocessing.image import ImageDataGenerator
#from tensorflow.keras.models import Sequential
#from tensorflow.keras.layers import Dense, Conv2D, Flatten, MaxPooling2D
#from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
#from tensorflow.keras.utils import plot_model
#from sklearn.utils.class_weight import compute_class_weight

import warnings
warnings.filterwarnings('ignore')

In [2]:
# Path to data set
csv_file = "file:///home/hduser/Downloads/work2/ProjectTweets.csv"

# Data Understanding

In [40]:
#Import pyspark SQL
from pyspark.sql import SparkSession        

# Create a SparkSession
spark = (SparkSession
  .builder
  .appName("SparkSQL")
  .getOrCreate())

# Read and create a temporary view
# The dataset doesnt contain header, so header = false
# toDF to define appropriated column name
dfTwitter = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "false")
  .load(csv_file)
  .toDF('id', 'seq', 'date', 'query', 'user', 'tweet'))

dfTwitter.createOrReplaceTempView("tblTempTwitter")




In [41]:
spark.sql("""SELECT * FROM tblTempTwitter""").show(10)

+---+----------+--------------------+--------+---------------+--------------------+
| id|       seq|                date|   query|           user|               tweet|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

In [42]:
spark.sql("""SELECT * FROM tblTempTwitter where query != 'NO_QUERY'""").show(10)

+---+---+----+-----+----+-----+
| id|seq|date|query|user|tweet|
+---+---+----+-----+----+-----+
+---+---+----+-----+----+-----+



In [43]:
spark.sql("""SELECT user, COUNT(user) as total FROM tblTempTwitter GROUP BY user ORDER BY total desc;""").show(20)



[Stage 49:>                                                         (0 + 6) / 6]

+---------------+-----+
|           user|total|
+---------------+-----+
|       lost_dog|  549|
|        webwoke|  345|
|       tweetpet|  310|
|SallytheShizzle|  281|
|    VioletsCRUK|  279|
|    mcraddictal|  276|
|       tsarnick|  248|
|    what_bugs_u|  246|
|    Karen230683|  238|
|      DarkPiano|  236|
|   SongoftheOss|  227|
|      Jayme1988|  225|
|         keza34|  219|
| ramdomthoughts|  216|
|      shanajaca|  213|
|         wowlew|  212|
|   TraceyHewins|  211|
|     nuttychris|  211|
|   thisgoeshere|  207|
|     Spidersamm|  205|
+---------------+-----+
only showing top 20 rows



                                                                                

Looking for null or blank date values

In [53]:
spark.sql("""SELECT user, tweet FROM tblTempTwitter where date is null or date =='';""").show(20)


[Stage 60:>                                                         (0 + 1) / 1]

+----+-----+
|user|tweet|
+----+-----+
+----+-----+



                                                                                

# Data Preparation

### SPARK HIVE


Creating a Database in Hive Metastore 

In [44]:
#Create a Database dbTwitter in Hive
spark.sql("CREATE DATABASE IF NOT EXISTS dbTwitter")



2023-10-18 06:55:47,746 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2023-10-18 06:55:47,746 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2023-10-18 06:55:50,981 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
2023-10-18 06:55:50,982 WARN metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore hduser@127.0.1.1
2023-10-18 06:55:51,010 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
2023-10-18 06:55:51,392 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
2023-10-18 06:55:51,398 WARN metastore.ObjectStore: Failed to get database dbtwitter, returning NoSuchObjectException


DataFrame[]

Using spark.sql() method "CREATE TABLE" to create a table in Hive from the spark temporary view tblTempTwitter.

In [46]:
#Create a Table in Hive tblTwitter on the bdTwitter database.
spark.sql("CREATE TABLE IF NOT EXISTS dbTwitter.tblTwitter (id Int, seq Double, date String, query String, user String, tweet String)")



2023-10-18 06:57:39,136 WARN analysis.ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.
2023-10-18 06:57:39,233 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
2023-10-18 06:57:39,361 WARN conf.HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
2023-10-18 06:57:39,361 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2023-10-18 06:57:39,362 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2023-10-18 06:57:39,372 WARN metastore.HiveMetaStore: Location: file:/home/hduser/Downloads/work2/spark-warehouse/dbtwitter.db/tbltwitter specified for non-external table:tbltwitter


DataFrame[]

Inserting data from the spark temporary view tblTempTwitter into the Hive table tblTwitter:

In [48]:
#Insert into Hive tblTwitter using the spar temp view tblTempTwitter. 
spark.sql("INSERT INTO TABLE dbTwitter.tblTwitter SELECT * FROM tblTempTwitter")



                                                                                

DataFrame[]

In [50]:
#Lets view the data in the hive table
spark.sql("SELECT * FROM dbTwitter.tblTwitter").show(10)

+------+-------------+--------------------+--------+---------------+--------------------+
|    id|          seq|                date|   query|           user|               tweet|
+------+-------------+--------------------+--------+---------------+--------------------+
|545133|2.201337104E9|Tue Jun 16 20:08:...|NO_QUERY|      alt_ducky|@miss_clariss oh ...|
|545134|2.201337108E9|Tue Jun 16 20:08:...|NO_QUERY|     CourtneyVR|Failed my WOF. Wi...|
|545135|2.201337287E9|Tue Jun 16 20:08:...|NO_QUERY|    melissaholt|Watching the firs...|
|545136|2.201337425E9|Tue Jun 16 20:08:...|NO_QUERY|       itznesha|my computer is in...|
|545137|2.201337512E9|Tue Jun 16 20:08:...|NO_QUERY|    lovinmyboys|Worked out my upp...|
|545138|2.201337757E9|Tue Jun 16 20:08:...|NO_QUERY|     mikerbrant|OMG I got my new ...|
|545139|2.201338077E9|Tue Jun 16 20:08:...|NO_QUERY|         daulex|my back has flare...|
|545140|2.201338113E9|Tue Jun 16 20:08:...|NO_QUERY|    CaliHeather|I am starting to ...|
|545141|2.

### MySQL

In [62]:
# Imports
from pyspark.sql import SparkSession

sparkMySQL = (SparkSession
  .builder
  .appName("SparkMySQL")
  .config("spark.jars", "mysql-connector-java-8.1.0.jar")
  .getOrCreate())

#spark = SparkSession.builder \
#  .appName("MyApp") \
#  .config("spark.jars", "mysql-connector-java-8.0.27.jar") \
#  .config("spark.driver.extraClassPath", "mysql-connector-java-8.0.27.jar") \
#  .getOrCreate()


In [65]:
#pip install ipython-sql

In [70]:
pip install mysqlclient

Collecting mysqlclient
  Downloading mysqlclient-2.2.0.tar.gz (89 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.5/89.5 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25lerror
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mGetting requirements to build wheel[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m [31m[27 lines of output][0m
  [31m   [0m /bin/sh: 1: pkg-config: not found
  [31m   [0m /bin/sh: 1: pkg-config: not found
  [31m   [0m Trying pkg-config --exists mysqlclient
  [31m   [0m Command 'pkg-config --exists mysqlclient' returned non-zero exit status 127.
  [31m   [0m Trying pkg-config --exists mariadb
  [31m   [0m Command 'pkg-config --exists mariadb' returned non-zero exit status 127.
  [31m   [0m Traceback (most recent call last):
  [31m   [0m   File "/ho

In [68]:
pip install mysql-connector-python

In [69]:
pip install pymysql

ModuleNotFoundError: No module named 'MySQLdb'

In [66]:
import mysql.connector

db_connection = mysql.connector.connect(user="hduser", password="Pass@word1")
db_cursor = db_connection.cursor()
db_cursor.execute("CREATE DATABASE IF NOT EXISTS dbTwitter;")
db_cursor.execute("USE dbTwitter;")

db_cursor.execute("CREATE TABLE IF NOT EXISTS dbTwitter.tblTwitter (id Int, seq Double, date String, query String, user String, tweet String)")


ModuleNotFoundError: No module named 'mysql'

In [None]:
dfTwitter.write 
  .format("jdbc") 
  .option("driver","com.mysql.cj.jdbc.Driver") 
  .option("url", jdbcUrl)
  .option("dbtable", "tblTwitter") 
  .option("user", username) 
  .option("password", password)
  .save()

In [None]:
jdbcUrl = "jdbc:mysql://<host>:<port>/<database>"
username = "hduser"
password = "Pass@word1"

df = spark.read 
  .format("jdbc") 
  .option("url", jdbcUrl) 
  .option("user", username) 
  .option("password", password) 
  .option("dbtable", "<table>") 
  .option("inferSchema", "true") 
  .load()
df.show()