*** @Author: Amar Pawar ***

*** @Date: 2021-08-23 ***

*** @Last Modified by: Amar Pawar ***

*** @Last Modified time: 2021-08-23 ***

*** @Title : Storing data from spark dataframe to MySQL, and from MySQL to pyspark ***

# *PYSPARK DATAFRAMES FROM CSVs*

In [None]:
from pyspark.sql import *
import mysql.connector
import pandas as pd
from pyspark.sql import SparkSession
appName = "PySpark to MySQL"
master = "local"
spark = SparkSession.builder.master(master).appName(appName).getOrCreate()

## Loadind csv file and creating dataframe

In [3]:
Df= spark.read.format("csv").option("header","true").load("hdfs://localhost:9000/Spark/CpuLogData/*.csv")



## counting total entries in dataframe

In [4]:
Df.select("user_name","DateTime","boot_time","keyboard","mouse").show()

+--------------------+-------------------+--------------+--------+------+
|           user_name|           DateTime|     boot_time|keyboard| mouse|
+--------------------+-------------------+--------------+--------+------+
|  iamnzm@outlook.com|2019-09-19 08:40:02|0:09:59.262105|     1.0|  32.0|
|  iamnzm@outlook.com|2019-09-19 08:45:02|0:14:59.259253|     0.0|   0.0|
|  iamnzm@outlook.com|2019-09-19 08:50:01|0:19:58.817858|     0.0|   0.0|
|  iamnzm@outlook.com|2019-09-19 08:55:01|0:24:58.366251|    11.0| 900.0|
|  iamnzm@outlook.com|2019-09-19 09:00:01|0:29:59.008276|     2.0|  25.0|
|  iamnzm@outlook.com|2019-09-19 09:05:01|0:34:58.858791|    37.0| 336.0|
|deepshukla292@gma...|2019-09-19 09:05:01|0:05:19.424878|     0.0|  55.0|
|  iamnzm@outlook.com|2019-09-19 09:10:01|0:39:58.482956|     0.0| 136.0|
|deepshukla292@gma...|2019-09-19 09:10:01|0:10:19.516467|     6.0|1112.0|
|  iamnzm@outlook.com|2019-09-19 09:15:02|0:44:59.088574|     0.0|  84.0|
|deepshukla292@gma...|2019-09-19 09:15

## Creating temp view to perform sql quieries

In [5]:
Df.createOrReplaceTempView("log_data")

2021-08-23 20:13:06,829 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


## Query to fetch total entries for each person

In [6]:
avg_hr_df= spark.sql("select user_name ,count('') as total from log_data where keyboard !=0.0 or mouse!=0.0 group by user_name")

In [7]:
avg_hr_df.show()



+--------------------+-----+
|           user_name|total|
+--------------------+-----+
|salinabodale73@gm...|  440|
|sharlawar77@gmail...|  457|
|rahilstar11@gmail...|  399|
|deepshukla292@gma...|  475|
|  iamnzm@outlook.com|  459|
|markfernandes66@g...|  389|
|damodharn21@gmail...|  191|
|bhagyashrichalke2...|  361|
+--------------------+-----+





## Storing data from spark dataframe to MySQL table

In [8]:
import os
from logging_handler import logger
from dotenv import load_dotenv
load_dotenv('.env')

True

In [9]:
database=os.getenv("DB_NAME")
user=os.getenv("DB_USER")
password=os.getenv("DB_PASSWORD")
host=os.getenv("DB_HOST")
auth_plugin=os.getenv('AUTH_PLUGIN')
logger.info(database)

In [10]:
try:
    jdb_curl = f"jdbc:mysql://localhost:3306/" + database
    #write the dataframe into a sql table
    avg_hr_df.write.format("jdbc").option("url",jdb_curl) \
        .mode("overwrite") \
        .option("driver", "com.mysql.jdbc.Driver").option("dbtable","record_count") \
        .option("user",user).option("password",password).save()
    logger.info("written to mysql db successfully")
except Exception as e:
    logger.info(e)

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.


## Fetching data from MySQL table to pyspark dataframe

In [11]:
import mysql.connector
import pandas as pd
try:
    conn = mysql.connector.connect(user=user, database=database,
                                password=password,
                                host=host,
                                port=3306,
                                    auth_plugin=auth_plugin)
    cursor = conn.cursor()
    query = "SELECT * FROM record_count"
    #Create a pandas dataframe
    pdf = pd.read_sql(query, con=conn)
    conn.close()

    # Convert Pandas dataframe to spark DataFrame
    df = spark.createDataFrame(pdf)
except Exception as e:
    logger.info(e)

df.show()



+--------------------+-----+
|           user_name|total|
+--------------------+-----+
|salinabodale73@gm...|  440|
|sharlawar77@gmail...|  457|
|rahilstar11@gmail...|  399|
|deepshukla292@gma...|  475|
|  iamnzm@outlook.com|  459|
|markfernandes66@g...|  389|
|damodharn21@gmail...|  191|
|bhagyashrichalke2...|  361|
+--------------------+-----+



