# Read_Write_and_Validate Data in PySpark

We need to always begin every Spark Session by creating a Spark instance. We can do so as below.

In [3]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadWriteVal").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")

# This will help us to get a link to the Spark UI 
spark

You are working with 1 core(s)


## Read a CSV file in Spark

In [5]:
path = 'C:/Users/kava2/Documents/Udemy/Spark_Datasets/'

pga = spark.read.csv(path + 'pga_tour_historical.csv', inferSchema = True, header = True)

### 1) View First 5 lines of the DataFrame

Let us view the first five lines of the dataframe to see what is inside it. We can do this two ways.

In [8]:
pga.show(5)

+---------------+------+----------------+--------------------+-----+
|    Player Name|Season|       Statistic|            Variable|Value|
+---------------+------+----------------+--------------------+-----+
|Robert Garrigus|  2010|Driving Distance|Driving Distance ...|   71|
|   Bubba Watson|  2010|Driving Distance|Driving Distance ...|   77|
| Dustin Johnson|  2010|Driving Distance|Driving Distance ...|   83|
|Brett Wetterich|  2010|Driving Distance|Driving Distance ...|   54|
|    J.B. Holmes|  2010|Driving Distance|Driving Distance ...|  100|
+---------------+------+----------------+--------------------+-----+
only showing top 5 rows



In [9]:
pga.limit(5).toPandas()

Unnamed: 0,Player Name,Season,Statistic,Variable,Value
0,Robert Garrigus,2010,Driving Distance,Driving Distance - (ROUNDS),71
1,Bubba Watson,2010,Driving Distance,Driving Distance - (ROUNDS),77
2,Dustin Johnson,2010,Driving Distance,Driving Distance - (ROUNDS),83
3,Brett Wetterich,2010,Driving Distance,Driving Distance - (ROUNDS),54
4,J.B. Holmes,2010,Driving Distance,Driving Distance - (ROUNDS),100


#### Note that converting the Spark dataframe to a pandas dataframe using the toPandas() method is not time or memory efficient.

### 2) Print the Schema Details

Now let us print the details of the dataframes schema that Spark infered to ensure that it was infered correctly. Sometimes it is not infered correctly, so we need to watch out!

In [12]:
print(pga.printSchema())
print(" ")
print(pga.columns)
print(" ")

# We can also use the describe() method 
print(pga.describe())

root
 |-- Player Name: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Statistic: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- Value: string (nullable = true)

None
 
['Player Name', 'Season', 'Statistic', 'Variable', 'Value']
 
DataFrame[summary: string, Player Name: string, Season: string, Statistic: string, Variable: string, Value: string]


### 3) Edit the Schema during the Read-In

We can see from the output above that Spark did not correctly infer that the "value" column was an integer value. Let's try specifying the schema this time to let spark know what the schema should be.

In [18]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [19]:
data_schema = [StructField("Player Name", StringType(), True), \
               StructField("Season", IntegerType(), True), \
               StructField("Statistic", StringType(), True), \
               StructField("Variable", StringType(), True), \
               StructField("Value", IntegerType(), True)]

In [20]:
final_struct = StructType(fields = data_schema)

In [21]:
# Now we can specify our custom schema while reading the data
path = 'C:/Users/kava2/Documents/Udemy/Spark_Datasets/'

pga = spark.read.csv(path + 'pga_tour_historical.csv', schema = final_struct)

In [23]:
print(pga.printSchema())
# That's better!

root
 |-- Player Name: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Statistic: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- Value: integer (nullable = true)

None


### 4) Generate Summary Statistics for only one variable

Let us generate summary statistics for only the "Value" column using the .describe function

(count, mean, stddev, min, max)

In [27]:
pga.describe(['Value']).show()

+-------+------------------+
|summary|             Value|
+-------+------------------+
|  count|           1657247|
|   mean|12494.388998743096|
| stddev|157274.75673570728|
|    min|              -178|
|    max|           3564954|
+-------+------------------+



### 5) Generate Summary Statistics for Two Variables

Now let us try to generate ONLY the count min and max for BOTH the "Value" and "Season" variable using the select. You can't use the .describe function for this one.

In [28]:
pga.select('Value', 'Season').summary('count', 'min', 'max').show()

+-------+-------+-------+
|summary|  Value| Season|
+-------+-------+-------+
|  count|1657247|2740403|
|    min|   -178|   2010|
|    max|3564954|   2018|
+-------+-------+-------+



### 6) Writing a Parquet File

Now let us try writing a parquet file (not partitioned) from the pga dataset. 

But first let us create a new dataframe containing ONLY the the "Season" and "Value" fields (using the "select command ) and then write a parquet file.

In [29]:
df = pga.select('Season', 'Value')

df.write.mode("overwrite").parquet('partition_parquet/')

Note that if any of our variable names contain spaces, spark will produce an error message with this call. That is why we are selecting ONLY the "Season" and "Value" fields. Ideally we should have renamed those columns.

Now let us create a partitioned parquet file (partitioned by the column 'Season').

### 7) Write a Partitioned Parquet File

In [30]:
df.write.mode("overwrite").partitionBy("Season").parquet("partitioned_parquet/")
df.show(5)

+------+-----+
|Season|Value|
+------+-----+
|  null| null|
|  2010|   71|
|  2010|   77|
|  2010|   83|
|  2010|   54|
+------+-----+
only showing top 5 rows



### 8) Read-In a Partitioned Parquet File

Now we shall read the parquet file that we just created.

In [31]:
path = "partitioned_parquet/" #Note: if you add a * to the end of the path, the Season var will be automatically dropped
parquet = spark.read.parquet(path)
        
parquet.show()

+-----+------+
|Value|Season|
+-----+------+
|   71|  2010|
|   77|  2010|
|   83|  2010|
|   54|  2010|
|  100|  2010|
|   63|  2010|
|   88|  2010|
|   64|  2010|
|   64|  2010|
|   92|  2010|
|   75|  2010|
|   54|  2010|
|   76|  2010|
|   94|  2010|
|   82|  2010|
|   85|  2010|
|   79|  2010|
|   89|  2010|
|   88|  2010|
|   91|  2010|
+-----+------+
only showing top 20 rows



### 9) Reading-In a Set of Partitioned Parquet Files

Now we shall only read Seasons 2010, 2011 and 2012.

In [32]:
# Notice that this method only gives you the "Value" column
path = "partitioned_parquet/"
partitioned = spark.read.parquet(path+'Season=2010/',\
                             path+'Season=2011/', \
                             path+'Season=2012/')

partitioned.show(5)

+-----+
|Value|
+-----+
|   71|
|   77|
|   83|
|   54|
|  100|
+-----+
only showing top 5 rows



In [33]:
# We need to use this method to get the "Season" and "Value" Columns
path = "partitioned_parquet/"
dataframe = spark.read.option("basePath", path).parquet(path+'Season=2010/',\
                                                                path+'Season=2011/', \
                                                                path+'Season=2012/')
dataframe.show(5)

+-----+------+
|Value|Season|
+-----+------+
|   71|  2010|
|   77|  2010|
|   83|  2010|
|   54|  2010|
|  100|  2010|
+-----+------+
only showing top 5 rows



### 10) Create your Own DataFrame

We can use the spark.createDataFrame function to create our own data frames.

In [34]:
values = [('Kyle',10,'A',1),('Melbourne',36,'A',1),('Nina',123,'A',1),('Stephen',48,'B',2),('Orphan',16,'B',2),('Imran',1,'B',2)]
df = spark.createDataFrame(values,['name','age','AB','Number'])
df.show()

+---------+---+---+------+
|     name|age| AB|Number|
+---------+---+---+------+
|     Kyle| 10|  A|     1|
|Melbourne| 36|  A|     1|
|     Nina|123|  A|     1|
|  Stephen| 48|  B|     2|
|   Orphan| 16|  B|     2|
|    Imran|  1|  B|     2|
+---------+---+---+------+

