#### `Import Libraries`

In [1]:
import pandas as pd 
import numpy as np
import os
import shutil
import tempfile
import warnings
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext as sc
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, HTML

warnings.filterwarnings("ignore")

### `Random Data Generation`

In [2]:
np.random.seed(12)
state_list = ["A", "B", "C", "D", "E", "F"]
df = pd.DataFrame({})
for i in range(20):
    ##### Choose Randomly from List
    state = np.random.choice(state_list)
    ##### Generate 1 value between 0 and 100
    value = np.random.randint(100, size=(1))[0]
    new_row = {"State":state, "Value":value}
    # df = df.append(new_row, ignore_index=True)
    df = pd.concat([df, pd.DataFrame(new_row, index=[0])], ignore_index=True)

#### `Data Overview`

In [3]:
display(HTML(df.to_html()))

Unnamed: 0,State,Value
0,D,27
1,F,2
2,D,67
3,E,48
4,B,52
5,F,13
6,B,34
7,D,74
8,A,76
9,F,82


#### `Saving Into .csv File`

In [10]:
# To handle direct file creation issue : Create a temporary file object with automatic cleanup
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as temp_file:
    # Save dataframe as temp csv file
    df.to_csv(temp_file.name, index=False)
    print("temp_file.name --> {0}".format(temp_file.name))
    
    # Output File Path
    output_dir = os.path.dirname(os.getcwd())
    output_file_path = output_dir + "/Data/data.csv"
    print(f"output_file_path --> {output_file_path}")

    # Copy temp .csv file to actual path
    if os.path.exists(output_dir):
        # df.to_csv(output_file_path, index=False)
        shutil.copy2(temp_file.name, output_file_path)
        print("File saved!!")

temp_file.name --> /tmp/tmpvxer71h6.csv
output_file_path --> /mnt/Local_Host/Git_Repo/PysparkCodeHub/Data/data.csv
File saved!!


#### `Creating Spark DataFrame Using Sample Data`

In [12]:
# Spark Session
spark = SparkSession\
        .builder\
        .appName('PySpark Data Analysis')\
        .getOrCreate()

# Sample Data
data = [("AAAA","","SSSS","43435","M",80000),
        ("MMMMM","RRRR","","2356","F",20000),
        ("RRRR","","WWWW","2343","",5460000),
        ("MMMM","AAAA","JJJJ","56734","F",6570000),
        ("JJJJ","MMMM","BBBB","","M",0)]

# Data Schema
columns = ["first_name","middle_name","last_name","dob","gender","salary"]

# Spark Dataframe
spark_df = spark.createDataFrame(data = data, schema = columns)

# Print Data Schema
spark_df.printSchema()

# Print DataFrame Type
print(type(spark_df))

# Print Spark Dataframe
spark_df.show()  

# Convert Spark Dataframe into Pandas Dataframe
pandas_df = spark_df.toPandas()
print(type(pandas_df))
display(HTML(pandas_df.to_html()))

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>
+----------+-----------+---------+-----+------+-------+
|first_name|middle_name|last_name|  dob|gender| salary|
+----------+-----------+---------+-----+------+-------+
|      AAAA|           |     SSSS|43435|     M|  80000|
|     MMMMM|       RRRR|         | 2356|     F|  20000|
|      RRRR|           |     WWWW| 2343|      |5460000|
|      MMMM|       AAAA|     JJJJ|56734|     F|6570000|
|      JJJJ|       MMMM|     BBBB|     |     M|      0|
+----------+-----------+---------+-----+------+-------+

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,first_name,middle_name,last_name,dob,gender,salary
0,AAAA,,SSSS,43435.0,M,80000
1,MMMMM,RRRR,,2356.0,F,20000
2,RRRR,,WWWW,2343.0,,5460000
3,MMMM,AAAA,JJJJ,56734.0,F,6570000
4,JJJJ,MMMM,BBBB,,M,0


#### `Read .CSV File & Register into Spark TempTable`

In [22]:
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
sqlcontext = SQLContext(sc)

# Input File Path
input_dir = os.path.dirname(os.getcwd())
input_file_path = input_dir + "/Data/data.csv"

# Reads .csv
sqlCtx = SQLContext(sc)
spark_df_csv_sample_data = sqlCtx\
                .read\
                .format("csv")\
                .options(header='true', inferschema='true', delimiter=',')\
                .load(input_file_path)
spark_df_csv_sample_data.registerTempTable("spark_df_csv_sample_data")

# Query Over Temp Table
sqlCtx.sql("select * from spark_df_csv_sample_data limit 5").show()

# Print Data Schema
spark_df_csv_sample_data.printSchema()

# Print DataFrame Type
print(type(spark_df_csv_sample_data))

# Count Rows
print(spark_df_csv_sample_data.count())

# Print Spark Dataframe
#spark_df_csv_sample_data.show(5, truncate=30)  
#spark_df_csv_sample_data.show(vertical=True)
spark_df_csv_sample_data.show(5, truncate=True)  

+-----+-----+
|State|Value|
+-----+-----+
|    D|   27|
|    F|    2|
|    D|   67|
|    E|   48|
|    B|   52|
+-----+-----+

root
 |-- State: string (nullable = true)
 |-- Value: integer (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>
20
+-----+-----+
|State|Value|
+-----+-----+
|    D|   27|
|    F|    2|
|    D|   67|
|    E|   48|
|    B|   52|
+-----+-----+
only showing top 5 rows



#### `Exploring Pyspark Predefined Functions`

In [45]:
# head 5 : way 1
spark_df_csv_sample_data.limit(5).collect()

[Row(State='D', Value=27),
 Row(State='F', Value=2),
 Row(State='D', Value=67),
 Row(State='E', Value=48),
 Row(State='B', Value=52)]

In [25]:
# head 2 : way 2
spark_df_csv_sample_data.take(2)

[Row(State='D', Value=27), Row(State='F', Value=2)]

In [27]:
# Select columns
spark_df_csv_sample_data.limit(2).select("State").show()

+-----+
|State|
+-----+
|    D|
|    F|
+-----+



In [46]:
# Column rename
spark_df_csv_sample_data\
    .limit(2)\
    .select(col("State").alias("State_New"), col("Value"))\
    .show()

+---------+-----+
|State_New|Value|
+---------+-----+
|        D|   27|
|        F|    2|
+---------+-----+



In [30]:
# List preparation based on spark dataframe columns
[col for col in spark_df_csv_sample_data.columns]

['State', 'Value']

In [31]:
# Filter pyspark columns list
spark_df_csv_sample_data.select(spark_df_csv_sample_data.columns[:2]).show(3)

+-----+-----+
|State|Value|
+-----+-----+
|    D|   27|
|    F|    2|
|    D|   67|
+-----+-----+
only showing top 3 rows



In [34]:
# Select all columns
spark_df_csv_sample_data.limit(3).select("*").show(truncate=False)

+-----+-----+
|State|Value|
+-----+-----+
|D    |27   |
|F    |2    |
|D    |67   |
+-----+-----+



In [36]:
# map() is only be performed on rdd
# so converting the dataframe into rdd using df.rdd
rdd = spark_df_csv_sample_data\
      .rdd\
      .map(lambda loop: (loop["State"],loop["Value"]))
print(rdd)

df2 = rdd.toDF(["State","Value"])
print(type(df2))
print(df2)
df2.show(5)

PythonRDD[253] at RDD at PythonRDD.scala:53
<class 'pyspark.sql.dataframe.DataFrame'>
DataFrame[State: string, Value: bigint]
+-----+-----+
|State|Value|
+-----+-----+
|    D|   27|
|    F|    2|
|    D|   67|
|    E|   48|
|    B|   52|
+-----+-----+
only showing top 5 rows



In [37]:
# hHead 2
rdd.take(2)

[('D', 27), ('F', 2)]

In [38]:
# List preparation with columns
[x[0] for x in rdd.take(2)]

['D', 'F']

In [39]:
# List preparation with values
[x[1] for x in rdd.take(2)]

[27, 2]

In [42]:
# Data collect
rdd.collect()

[('D', 27),
 ('F', 2),
 ('D', 67),
 ('E', 48),
 ('B', 52),
 ('F', 13),
 ('B', 34),
 ('D', 74),
 ('A', 76),
 ('F', 82),
 ('B', 35),
 ('E', 35),
 ('B', 30),
 ('A', 18),
 ('C', 80),
 ('E', 6),
 ('D', 73),
 ('F', 30),
 ('A', 32),
 ('D', 59)]

In [43]:
# Data in the variable 
table = [x["Value"] for x in spark_df_csv_sample_data.rdd.collect()]

# Looping the list for printing 
for row in table:
    print(row)

27
2
67
48
52
13
34
74
76
82
35
35
30
18
80
6
73
30
32
59


In [44]:
# Looping through rows
rows_looped = spark_df_csv_sample_data.select("State", "Value").limit(2).collect()
print(type(spark_df_csv_sample_data))
print(spark_df_csv_sample_data)
for rows in rows_looped:
    print(rows[0], "-->", rows[1])

<class 'pyspark.sql.dataframe.DataFrame'>
DataFrame[State: string, Value: int]
D --> 27
F --> 2
