# Reading Writing and Validating Data in PySpark

Welcome to PySpark!

In this first lecture, we will be covering:

 - Reading in Data
 - Partioned Files
 - Validating Data
 - Specifying Data Types
 - Writing Data

Below you will see the script to begin your first PySpark instance. If you're ever curious about how your PySpark instance is performing, Spark offers a neat Web UI with tons of information. Just navigate to http://[driver]:4040 in your browswer where "drive" is you driver name. If you are running PySpark locally, it would be http://localhost:4040 or you can use the hyperlink automatically produced from the script below. 

In [4]:
# First let's create our PySpark instance!

# PC users can use the next two lines of code but mac users don't need it
import findspark

findspark.init()

import pyspark  # only run after findspark.init()
from pyspark.sql import SparkSession

# May take awhile locally
spark = SparkSession.builder.appName("ReadWriteVal").master("local[4]").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Reading data

A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various functions in SparkSession.

First let's try reading in a csv file containing a list of students and their grades.

**Source:** https://www.kaggle.com/spscientist/students-performance-in-exams

In [5]:
# Start by reading a basic csv dataset
# Let Spark know about the header and infer the Schema types!

path = "Datasets/"

# Some csv data
students = spark.read.csv(path + 'students.csv', inferSchema=True, header=True)

**Parquet Files**

Now try reading in a parquet file. This is most common data type in the big data world.
Why? because it is the most compact file storage method (even better than zipped files!)

In [12]:
parquet = spark.read.parquet(path + 'users1.parquet')
parquet.show(2)
parquet.count()

+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|2016-02-03 08:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|2016-02-03 18:04:03|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows



1000

**Partitioned Parquet Files**

Actually most big datasets will be partitioned. Here is how you can collect all the pieces (parts) of the dataset in one simple command.

In [16]:
from pyspark.sql.functions import col

partitioned = spark.read.parquet(path + 'users*').withColumn(
    "registration_dttm", col("registration_dttm").cast("timestamp")
).drop("registration_dttm")
partitioned.show(2)
partitioned.count()
partitioned.limit(5).toPandas()

+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows



Unnamed: 0,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


You can also opt to read in only a specific set of paritioned parquet files. Say for example that you only wanted users1 and users2 and not users3

In [17]:
# Note that the .option("basePath", path) option is used to override the automatic function
# that will exclude the partitioned variable in resulting dataframe. 
# I prefer to have the partitioning info in my new dataframe personally. 
users1_2 = spark.read.option("basePath", path).parquet(path + 'users1.parquet', path + 'users2.parquet')
users1_2.show()

+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name|last_name|               email|gender|     ip_address|                 cc|             country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|2016-02-03 08:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|    1.197.201.2|   6759521864920116|           Indonesia|  3/8/1971| 49756.53|    Internal Auditor|               1E+02|
|2016-02-03 18:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male| 218.111.175.34|                   |              Canada| 1/16/1968|150280.17|       Accountant IV|                    |
|2016-02-03 02:09:31|  3|

#### If you're in AWS storing data in s3 buckets your code will more like this...

In [6]:
bucket = "my_bucket"
key1 = "partition_test/Table1/CREATED_YEAR=2015/*"
key2 = "partition_test/Table1/CREATED_YEAR=2017/*"
key3 = "partition_test/Table1/CREATED_YEAR=2018/*"

test_df = spark.read.parquet('s3://' + bucket + '/' + key1, \
                             's3://' + bucket + '/' + key2, \
                             's3://' + bucket + '/' + key3)

test_df.show(1)

Py4JJavaError: An error occurred while calling o52.parquet.
: java.io.IOException: No FileSystem for scheme: s3
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:644)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


## Validating Data

Next you will want to validate that you dataframe was read in correct. We will get into more detailed data evaluation later on but first we need to ensure that all the variable types were infered correctly and that the values actually made it in... sometimes they don't :)

In [18]:
# Get an inital view of your dataframe
students.show(3)

+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|   lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|standard|                   none|        72|           72|           74|
|female|       group C|               some college|standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|standard|                   none|        90|           95|           93|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
only showing top 3 rows



In [6]:
# If your dataframe is more than just a few variables, this method is way better
students.limit(5).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44
4,male,group C,some college,standard,none,76,78,75


In [19]:
# Note the types here:
print(type(students))
studentsPdf = students.toPandas()
print(type(studentsPdf))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


In [20]:
# A Solid Summary of your data:

#show the data (like df.head())
print(students.printSchema())
print("")
print(students.columns)
print("")
print(students.describe())  # Not so fond of this one but to each their own

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)

None

['gender', 'race/ethnicity', 'parental level of education', 'lunch', 'test preparation course', 'math score', 'reading score', 'writing score']

DataFrame[summary: string, gender: string, race/ethnicity: string, parental level of education: string, lunch: string, test preparation course: string, math score: string, reading score: string, writing score: string]


In [21]:
# If you need to get the type of just ONE column by name you can use this function:
students.schema['math score'].dataType

IntegerType

In [22]:
# Neat "describe" function
students.describe(['math score']).show()

+-------+------------------+
|summary|        math score|
+-------+------------------+
|  count|              1000|
|   mean|            66.089|
| stddev|15.163080096009454|
|    min|                 0|
|    max|               100|
+-------+------------------+



In [26]:
# Summary function
students.select("math score", "reading score", "writing score").summary("count", "min", "25%", "75%", "max").show()

+-------+----------+-------------+-------------+
|summary|math score|reading score|writing score|
+-------+----------+-------------+-------------+
|  count|      1000|         1000|         1000|
|    min|         0|           17|           10|
|    25%|        57|           59|           57|
|    75%|        77|           79|           79|
|    max|       100|          100|          100|
+-------+----------+-------------+-------------+



## How to specify data types as you read in datasets.

Some data types make it easier to infer schema (like tabular formats such as csv which we will show later). 

However you often have to set the schema yourself if you aren't dealing with a .read method that doesn't have inferSchema() built-in.

Spark has all the tools you need for this, it just requires a very specific structure:

In [27]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

Next we need to create the list of Structure fields
    * :param name: string, name of the field.
    * :param dataType: :class:`DataType` of the field.
    * :param nullable: boolean, whether the field can be null (None) or not.

In [28]:
data_schema = [StructField("name", StringType(), True),
               StructField("email", StringType(), True),
               StructField("city", StringType(), True),
               StructField("mac", StringType(), True),
               StructField("timestamp", DateType(), True),
               StructField("creditcard", StringType(), True)]

In [29]:
final_struc = StructType(fields=data_schema)

We'll do a .json file this time :) 

**Source:** https://gist.github.com/raine/da15845f332a2fb8937b344504abfbe0

In [30]:
people = spark.read.json(path + 'people.json', schema=final_struc)

In [31]:
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



## Writing Data

First let's just try writing a simple csv file.

In [32]:
# Note the funky naming convention of the file in your output folder. There is no way to directly change this. 
students.write.mode("overwrite").csv('write_test.csv')

Note the strange naming convention of the output file in the path that you specified. Spark uses Hadoop File Format, which requires data to be partitioned - that's why you have part- files. If you want to rename your written files to a more user friendly format, you can do that using the method below:

In [33]:
from py4j.java_gateway import java_import

java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(spark._jvm.Path('write_test.csv/part*'))[0].getPath().getName()
fs.rename(spark._jvm.Path('write_test.csv/' + file),
          spark._jvm.Path('write_test2.csv'))  #these two need to be different
fs.delete(spark._jvm.Path('write_test.csv'), True)

True

#### Writting Parquet files

Now let's try writing a parquet file. This is best practice for big data as it is the most compact storage method.

In [34]:
users1_2.write.mode("overwrite").parquet('parquet/')

For those who got an error attempting to run the above code. Try this solution: https://stackoverflow.com/questions/59220832/unable-to-write-spark-dataframe-to-a-parquet-file-format-to-c-drive-in-pyspark

#### Writting Partitioned Parquet Files

Now try to write a partioned parquet file... super fun!

In [35]:
users1_2.write.mode("overwrite").partitionBy("gender").parquet('part_parquet/')

#### Writting your own dataframes here!

You can also create your own dataframes directly here in your Juypter Notebook too if you want. 

Like this!

In [36]:
values = [('Pear', 10), ('Orange', 36), ('Banana', 123), ('Kiwi', 48), ('Peach', 16), ('Strawberry', 1)]
df = spark.createDataFrame(values, ['fruit', 'quantity'])
df.show()

+----------+--------+
|     fruit|quantity|
+----------+--------+
|      Pear|      10|
|    Orange|      36|
|    Banana|     123|
|      Kiwi|      48|
|     Peach|      16|
|Strawberry|       1|
+----------+--------+



#### That's it! Great job