# Load teslafi to MySQL/MariaDB or Spark SQL using PySpark

It is very useful in the data exploration or descriptive analytics phase of a project to be able to query your CSV files more or less directly using the power of SQL.

Spark makes this very simple by creating tables in Hive that reference the CSVs. In addition to SQL, this gives us the additional capabilities of the Spark and Pandas data frames.

Spark then allows us to create permanent tables in Hive using the very efficient Parquet file format. 

If we then want to store to a database outside of the Spark environment, we can save those dataframes to MySQL/MariaDB or other JDBC compliant databases.

## Prep the Spark server with the required JDBC driver `.jar` file

On Databricks, this can be done in the UI: Compute -> cluster -> Libraries

## Prep the target MySQL/MariaDB server
Create the database you want to use beforehand.  This code will create tables, but not the database.

On local Spark, the `.jar` can be placed in `$SPARK_HOME/jars/`

## Import libraries and configure secrets

In this example, we are using the `configparser` library to read a simple `.ini` style file named `CREDENTIALS.config`.  If you choose to use this method, create a section in the file like this:
```
[csvload]
sqluser = <myuser>
sqlpassword = <mypass>
sqlhost = <host or ip>
sqlport = <3306 or custom port>
sqldb = <database name>
```


In [None]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.types import DateType

import pandas as pd
from configparser import ConfigParser

import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/share/java/mariadb-java-client.jar pyspark-shell'

import findspark
findspark.init()


In [None]:
# !env|grep SPARK
# !cp -v /usr/share/java/mariadb-java-client.jar $SPARK_HOME/jars/

# Establish our Spark session

In [None]:
# spark = SparkSession.builder.config("spark.jars", "/usr/share/java/mariadb-java-client.jar").appName("TeslafiLoad").getOrCreate()
spark = SparkSession.builder.appName("CSVLoad").getOrCreate()
sc = spark.sparkContext


In [None]:
# !echo $VIRTUAL_ENV
# !env|grep SPARK
!echo $SPARK_HOME
# !cp -v  /usr/share/java/mariadb-java-client.jar $SPARK_HOME/jars/
!ls $SPARK_HOME/jars
# # !pip install pandas findspark

In [None]:
# os.listdir(path='/data/data-files/teslafi/')

# Optional - create a Spark/Hive temporary table from the data

# Write CSVs to MySQL/MariaDB

In [None]:
# File location and type
csvdir = '/data/graph-data/AntiFraud/data/'
# csvdir = '/data/data-files/teslafi/'

file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# MySQL/MariaDB info
# Get credentials from file
config_section = 'csvload'
parser = ConfigParser()
_ = parser.read('CREDENTIALS.config')


user = parser.get(config_section, 'sqluser')
password = parser.get(config_section, 'sqlpassword')
host = parser.get(config_section, 'sqlhost')
port = parser.get(config_section, 'sqlport')
db = parser.get(config_section, 'sqldb')
sqlurl = 'jdbc:mysql://' + host + ':' + port + '/' + db

# # for path,name in [(f.path,f.name) for f in dbutils.fs.ls(csvdir) if f.path.endswith('.csv')  ]:
for fname in os.listdir(path=csvdir):
    if fname.endswith('.csv'):
        print(fname)
        # if not fname in ['ff_party.csv', 'ff_company.csv']:
        #     print('skipping ' + fname)
        #     continue
        df = spark.read.format('csv') \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(csvdir + '/' + fname)
        # for a,b in df.dtypes:
            # if b == 'timestamp': 
            #     newdf = df.withColumn(a, df[a].cast(DateType()))
            #     print('   CHANGED column ' + a + ' to Date type')
            #     df = newdf

        df.write.format('jdbc').options(
            url=sqlurl,
            driver='org.mariadb.jdbc.Driver',
            dbtable=fname.replace('.csv',''),
            user=user,
            password=password).mode('append').save()

In [None]:
df.dtypes


In [None]:
p = df.toPandas()

In [None]:
p['polling'].unique()


22/07/22 07:40:08 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 32872527 ms exceeds timeout 120000 ms
22/07/22 07:40:08 WARN SparkContext: Killing executors is not supported by current scheduler.
