# Libraries

In [1]:
import os 
import numpy as np
import pandas as pd
import mysql.connector

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

os.getcwd()

22/06/27 17:47:59 WARN Utils: Your hostname, cesar-GL62M-7RDX resolves to a loopback address: 127.0.1.1; using 192.168.1.155 instead (on interface wlp2s0)
22/06/27 17:47:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/06/27 17:48:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'/home/cesar/Python_NBs/pySpark_Notes'

# User-Defined Functions

In [2]:
# For privacy reasons, User Information is located in a .txt file in the same folder
# First row contains the user name
# Second row contains the password

try:
    login = pd.read_csv(r'mysql_login.txt', header=None)
    mysql_user = login[0][0]
    mysql_pwd = login[0][1]
    print('User information is ready!')
except:
    print('Login information is not available!!!')

User information is ready!


In [3]:
# Establish a connection. Define parameters
mysql_conn = mysql.connector.connect(
  host="localhost"
  , user = mysql_user
  , password = mysql_pwd
  , database='HDL_Project'
)
    
def read_sql(mysql_conn, sqlq, rdd = False):
    
    # -----------------------
    # Description:
    # Reads MySQL data and transforms into spark RDD object
    
    # Parameters:
    # * sqlq: SQL query.
    # * rdd: Binary parameter defining transformation into Spark RDD object
    # -----------------------

    mysql_cursor = mysql_conn.cursor()

    # Create a pandas dataframe
    df = pd.read_sql(sqlq, con=mysql_conn)
    
    mysql_cursor.close()

    # Convert Pandas dataframe to spark DataFrame
    if rdd == True:
        df = spark.createDataFrame(df)

    return df

# Data

## Creating RDD's using parallelize()

In [4]:
parallelize = spark.sparkContext.parallelize

In [5]:
# Creating RDD from input by the user
df = parallelize([
  ('SE', 'La Pastora')
, ('NE', 'San Nicolás')
, ('CE', 'Obispado')
, ('NO', 'San Bernabé')
, ('SO', 'Sta. Catarina')
, ('NO2', 'García')
, ('NTE', 'Escobedo')
, ('NE2', 'Apodaca')
, ('SE2', 'Juárez')
, ('SO2', 'San Pedro')
, ('SE3', 'Cadereyta')
, ('SUR', 'Pueblo Serena')
, ('NTE2', 'Universidad')
]).toDF(['station_code', 'station_name'])

# Visualization
df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+------------+-------------+
|station_code| station_name|
+------------+-------------+
|          SE|   La Pastora|
|          NE|  San Nicolás|
|          CE|     Obispado|
|          NO|  San Bernabé|
|          SO|Sta. Catarina|
+------------+-------------+
only showing top 5 rows



In [6]:
# Creating RDD from input by the user
myData = parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])

# Visualization
myData.collect()

[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

## Creating RDD's using createDataFrame()

In [7]:
createDataFrame = spark.createDataFrame

In [8]:
stations = createDataFrame([('SE', 'La Pastora')
                            , ('NE', 'San Nicolás')
                            , ('CE', 'Obispado')
                            , ('NO', 'San Bernabé')
                            , ('SO', 'Sta. Catarina')
                            , ('NO2', 'García')
                            , ('NTE', 'Escobedo')
                            , ('NE2', 'Apodaca')
                            , ('SE2', 'Juárez')
                            , ('SO2', 'San Pedro')
                            , ('SE3', 'Cadereyta')
                            , ('SUR', 'Pueblo Serena')
                            , ('NTE2', 'Universidad')],
                        ['station_code', 'station_name'])

stations.show()

+------------+-------------+
|station_code| station_name|
+------------+-------------+
|          SE|   La Pastora|
|          NE|  San Nicolás|
|          CE|     Obispado|
|          NO|  San Bernabé|
|          SO|Sta. Catarina|
|         NO2|       García|
|         NTE|     Escobedo|
|         NE2|      Apodaca|
|         SE2|       Juárez|
|         SO2|    San Pedro|
|         SE3|    Cadereyta|
|         SUR|Pueblo Serena|
|        NTE2|  Universidad|
+------------+-------------+



## Creating RDD's using read() and load() functions
spark.read.format("csv").load("path")

In [9]:
read_format = 'com.databricks.spark.csv'
file_location = "Datasets/stations.csv"


df = spark.read.format(read_format).\
                               options(header='true', inferschema='true').\
                load(file_location,header=True)

df.show(5)
df.printSchema()

+------------+-------------+
|station_code| station_name|
+------------+-------------+
|          SE|   La Pastora|
|          NE|  San Nicolás|
|          CE|     Obispado|
|          NO|  San Bernabé|
|          SO|Sta. Catarina|
+------------+-------------+
only showing top 5 rows

root
 |-- station_code: string (nullable = true)
 |-- station_name: string (nullable = true)



## Creating RDD's using spark.read.csv("path")

In [10]:
# Reading CSV file
df = spark.read.options(header='true', inferschema='true').csv(file_location)

# Visualization
df.show()
df.printSchema()

+------------+-------------+
|station_code| station_name|
+------------+-------------+
|          SE|   La Pastora|
|          NE|  San Nicolás|
|          CE|     Obispado|
|          NO|  San Bernabé|
|          SO|Sta. Catarina|
|         NO2|       García|
|         NTE|     Escobedo|
|         NE2|      Apodaca|
|         SE2|       Juárez|
|         SO2|    San Pedro|
|         SE3|    Cadereyta|
|         SUR|Pueblo Serena|
|        NTE2|  Universidad|
+------------+-------------+

root
 |-- station_code: string (nullable = true)
 |-- station_name: string (nullable = true)



# Create RDD's from MySQL

In [11]:
mysql_cursor = mysql_conn.cursor()

# -------------------------------------

sqlq = "SELECT * FROM cat_stations"

# Create a pandas dataframe
pdf = pd.read_sql(sqlq, con=mysql_conn)
mysql_cursor.close()

# Convert Pandas dataframe to spark DataFrame
df = spark.createDataFrame(pdf)

df.show()



+------------+-------------+
|station_code| station_name|
+------------+-------------+
|          SE|   La Pastora|
|          NE|  San Nicolás|
|          CE|     Obispado|
|          NO|  San Bernabé|
|          SO|Sta. Catarina|
|         NO2|       García|
|         NTE|     Escobedo|
|         NE2|      Apodaca|
|         SE2|       Juárez|
|         SO2|    San Pedro|
|         SE3|    Cadereyta|
|         SUR|Pueblo Serena|
|        NTE2|  Universidad|
+------------+-------------+



In [12]:
# Previous function has been redesigned as a callable function.
sqlq = "SELECT * FROM cat_stations"

read_sql(mysql_conn, sqlq, rdd = True).show()



+------------+-------------+
|station_code| station_name|
+------------+-------------+
|          SE|   La Pastora|
|          NE|  San Nicolás|
|          CE|     Obispado|
|          NO|  San Bernabé|
|          SO|Sta. Catarina|
|         NO2|       García|
|         NTE|     Escobedo|
|         NE2|      Apodaca|
|         SE2|       Juárez|
|         SO2|    San Pedro|
|         SE3|    Cadereyta|
|         SUR|Pueblo Serena|
|        NTE2|  Universidad|
+------------+-------------+



# Transformations

## Create RDD from Dictionary

In [13]:
d = {'A': [0, 1, 0],
     'B': [1, 0, 1],
     'C': [1, 0, 0]}

In [14]:
# Tedious for PySpark
ds = spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys()))
ds.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+



## Create RDD from JSON file

In [15]:
spark.read.json('Datasets/s31_220415.json').show(5)

+--------------------+--------------------+--------------------+------+--------+-------+--------+------------------+
|             created|                date|                 gas|precip|pressure|station|    temp|              wind|
+--------------------+--------------------+--------------------+------+--------+-------+--------+------------------+
|2022-04-15T00:00:...|2022-04-15T12:00:...|{0, 35.0462, 18.2...|17.792|924.4951|     31| 292.054| [-1.1861, 0.1817]|
|2022-04-15T00:00:...|2022-04-17T23:00:...|{0, 87.1104, 25.5...|0.0059|921.5762|     31|308.5085|[-4.3963, -1.0718]|
|2022-04-15T00:00:...|2022-04-18T00:00:...|{0, 70.8924, 25.3...|   0.0|922.0118|     31|306.3807| [-2.496, -0.7729]|
|2022-04-15T00:00:...|2022-04-18T01:00:...|{0, 61.5726, 28.5...|   0.0|922.9371|     31|304.4861|[-2.4889, -0.6556]|
|2022-04-15T00:00:...|2022-04-18T02:00:...|{0, 53.8242, 22.574}|   0.0|924.3472|     31|302.8295| [-3.0352, 0.2908]|
+--------------------+--------------------+--------------------+

# Actions

Making operations over the RDD is the same as with any Pandas dataframe. 
(due to space concerns, I'm not displaying an example for each one...) <br>
You may:
* **df.colums**: Show column names
* **df.dtypes**: Show data types for every column
* **df.fillna(X)**: Fill NA values with an X value.
* **df.col.replace(['A1','B1'],['A2','B2'])**: Replace values for a specific column (col). Mixing data types is not allowed.
* [E1]**df.toDF(*new_names)**: Change column names by defining an array with new names
* **df.drop(*drop_name)**: Drop data column.
* Filter
* **df.withColumn()** Add new column
* Join
* Concatenate columns
* GroupBy
* Pivot
* Window
* Rank

In [16]:
# E1
new_names = ['A1', 'B1', 'C1']
ds = ds.toDF(*new_names)
ds.show()

+---+---+---+
| A1| B1| C1|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+



# Dictionary of Concepts

* **Resilient Distributed Dataset (RDD)**: An RDD in Spark is simply an immutable distributed collection of objects sets. Each RDD is split into multiple partitions (similar pattern with smaller sets), which may be computed on different nodes of the cluster.
* **Spark Transformations**: Transformations construct a new RDD from a previous one. 
* **Spark Actions**: Actions compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).

# Dictionary of functions

* **parallelize()**: Distribute a local Python collection to form an RDD.
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.SparkContext.parallelize
* **createDataFrame()** Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.SparkSession.createDataFrame.html

# Source

* 5. Programming with RDDs. <br>
https://runawayhorse001.github.io/LearningApacheSpark/rdd.html