# Read / Write Operations to Postgres using PySpark
___

### Initializing PySpark

In [1]:
import findspark
findspark.init()
import pyspark

### Imports

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

### MySQL Dependancy Jar

In [3]:
mysql_jar = 'F:\\My_Practice\\Database_Connector\\Resources\\mysql-connector-java-8.0.17.jar'

### Creating Spark Session

In [4]:
spark = (SparkSession.builder
           .config("spark.jars",mysql_jar)
           .appName('mysql')
           .getOrCreate()
        )

### Checking Spark Configurations

In [5]:
SparkConf().getAll()

[('spark.app.name', 'mysql'),
 ('spark.jars',
  'file:///F:/My_Practice/Database_Connector/Resources/mysql-connector-java-8.0.17.jar'),
 ('spark.repl.local.jars',
  'file:///F:/My_Practice/Database_Connector/Resources/mysql-connector-java-8.0.17.jar'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

### MySQL Server Credentials

In [6]:
url="jdbc:mysql://192.168.2.19:3306/mysql"
driver = "com.mysql.cj.jdbc.Driver"
#driver = "com.mysql.jdbc.Driver"
dbtable = "si_attrib"
user="root"
password="Aline-12#"

### Reading File

In [7]:
filedf = (spark.read.format("csv")
                .options(header=True, inferSchema=True, delimiter='\t')
                .load('../Resources/SI_attr.tab')
         )

### Show

In [8]:
filedf.show(5)

+------------------+----------------------+-----+-----------------------+----------------+--------------------------------+
|     Sales_Item_Id|Sales_Item_Description|  GIC|Product_Lifecycle_State|Purchasing_Group|Average Standard Production Cost|
+------------------+----------------------+-----+-----------------------+----------------+--------------------------------+
|    IE2:15HP-RFU-7|           7GHZ HP ODU|07599|               OBSOLETE|            null|                             0.0|
|    IE2:15HP-RFU-8|  1500HP RF UNIT, F...|07599|               OBSOLETE|            null|                             0.0|
|IE2:15HP-SHORT-112|    1500HP SHORT, FGHZ|07599|               OBSOLETE|            null|                             0.0|
|IE2:15HP-SHORT-137|     1500P SHORT, FGHZ|07599|               OBSOLETE|            null|                             0.0|
| IE2:15HP-TERM-112|  1500HP 50 OHM TER...|09208|               OBSOLETE|            null|                             0.0|
+-------

### Schema

In [9]:
filedf.printSchema()

root
 |-- Sales_Item_Id: string (nullable = true)
 |-- Sales_Item_Description: string (nullable = true)
 |-- GIC: string (nullable = true)
 |-- Product_Lifecycle_State: string (nullable = true)
 |-- Purchasing_Group: string (nullable = true)
 |-- Average Standard Production Cost: double (nullable = true)



### Reading table from MySQL

In [10]:
tableDF = (spark.read
            .format("jdbc")
            .options(url=url, driver=driver, dbtable=dbtable, user=user, password=password)
            .load()
          )

### Checking Schema of the table

In [11]:
tableDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- SALES_ITEM_ID: string (nullable = true)
 |-- SALES_ITEM_DESCRIPTION: string (nullable = true)
 |-- GIC: string (nullable = true)
 |-- PRODUCT_LIFECYCLE_STATE: string (nullable = true)
 |-- PURCHASING_GROUP: string (nullable = true)
 |-- AVERAGESTANDARDPRODUCTIONCOST: double (nullable = true)



### Changing Dataframe columns same as table columns

In [12]:
filedf = filedf.withColumnRenamed("Average Standard Production Cost","AVERAGESTANDARDPRODUCTIONCOST")

### Converting all file dataframe columns to upper case

In [13]:
filedf = filedf.toDF(*[c.upper() for c in filedf.columns])

### Generating sequential id column using zipWithIndex

In [14]:
from pyspark.sql import Row
from pyspark.sql.types import IntegerType, StructType, StructField

In [15]:
schema = StructType(filedf.schema.fields + [StructField('id', IntegerType(), False)])

In [16]:
rddWithId = filedf.rdd.zipWithIndex()

In [17]:
filedf = spark.createDataFrame(rddWithId.map(lambda row: row[0] + Row(row[1] + 1)), schema)

### Show

In [18]:
filedf.show(5)

+------------------+----------------------+-----+-----------------------+----------------+-----------------------------+---+
|     SALES_ITEM_ID|SALES_ITEM_DESCRIPTION|  GIC|PRODUCT_LIFECYCLE_STATE|PURCHASING_GROUP|AVERAGESTANDARDPRODUCTIONCOST| id|
+------------------+----------------------+-----+-----------------------+----------------+-----------------------------+---+
|    IE2:15HP-RFU-7|           7GHZ HP ODU|07599|               OBSOLETE|            null|                          0.0|  1|
|    IE2:15HP-RFU-8|  1500HP RF UNIT, F...|07599|               OBSOLETE|            null|                          0.0|  2|
|IE2:15HP-SHORT-112|    1500HP SHORT, FGHZ|07599|               OBSOLETE|            null|                          0.0|  3|
|IE2:15HP-SHORT-137|     1500P SHORT, FGHZ|07599|               OBSOLETE|            null|                          0.0|  4|
| IE2:15HP-TERM-112|  1500HP 50 OHM TER...|09208|               OBSOLETE|            null|                          0.0|  5|


### File Dataframe Schema

In [19]:
filedf.printSchema()

root
 |-- SALES_ITEM_ID: string (nullable = true)
 |-- SALES_ITEM_DESCRIPTION: string (nullable = true)
 |-- GIC: string (nullable = true)
 |-- PRODUCT_LIFECYCLE_STATE: string (nullable = true)
 |-- PURCHASING_GROUP: string (nullable = true)
 |-- AVERAGESTANDARDPRODUCTIONCOST: double (nullable = true)
 |-- id: integer (nullable = false)



### Checking Table Count before writing

In [20]:
tableDF.count()

0

### Reducing File Count to 1000

In [21]:
filedf = filedf.limit(1000)

### Writing to MySQL

In [22]:
(filedf.write
  .format("jdbc")
  .options(url=url, dbtable=dbtable, user=user, password=password, driver=driver)
  .mode('append')
  .save()
)

### Closing Spark Session

In [23]:
spark.stop()