### Spark Dataframe

#### Brief history
- Spark started out storing data in what they called an "RDD" which required syntax that was not intuitive.

- Spark 2.0 and higher now no longer maintains RDD syntax and exclusivly uses DataFrame syntax which is easier to work with.

#### Reading in DataFrames

- Supports professional formats like JSON files, Parquet files, Hive table

- From local file system, HDFS, cloud storage(S3) or external RBD

#### Partitioning data in pyspark

- Pyspark offers build in libarary to read and write "partitioned" datasets which is preferred storage method that divides up the dataset into more than one part.

- When it comes to Big data management, the preferred method is what's called parquet files(compression type, even smaller than csv)

#### Showing results vs Collecting results.

- **show()** gives you a preview (like python pandas.DataFrame.head())

- **collect()**
    - it's done more addtional processing 
    - more computationally expensive

#### Spark User Defined Functions (UDF's)

- In pyspark, not all your python functions will be able to iterate over a distributed dataframe. 
- If your  def is designed to iterate over a column in a dataframe, you need to use a Spark UDF. 


    def square_float(x):
        return float(x**2)
    square_udf_float2 = udf(lambda z : square_float(z), FloatType())
    

        
    

#### Spark User Defined Functions (UDF's)2

    def Indexer(df,indenpendent_var):
        renamed = df.withColumn("label_str",df[dependent_var].cast(StringType()))
        indexer = StringIndexer(inputCol="label_str",outputCol="label")
        indexed = indexer.fit(renamed).transform(renamed)
        return indexed
        
    final_data=Indexer(df,input_columns,dependent_var)

### Reading and Writing in PySpark

**Objectives:**
- Reading in Data
- Partioned Files
- Validating Data
- Specifying Data Types
- Writing Data

#### Configuration

In [1]:
import os
os.environ['HADOOP_HOME']=r"C:\encore_migo\spark\spark-3.0.3-bin-hadoop2.7"
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk1.8.0_291"

import sys 
sys.path.append("C:\encore_migo\spark\spark-3.0.3-bin-hadoop2.7")

In [3]:
os.environ["echosys"]

KeyError: 'echosys'

In [2]:
import findspark
findspark.init("C:\encore_migo\spark\spark-3.0.3-bin-hadoop2.7") 
#this is not comparable to MacOS, it finds your local pyspark instance within your local drives automatically 



import pyspark
from pyspark.sql import SparkSession

#SparkSession like an object
spark = SparkSession.builder.appName("ReadWriteVal").getOrCreate() #Note that it is CamelCase
spark

In [3]:
#Above link "Spark UI" will open up Spark Job page. 
# you can check out how long the job takes and etc.


In [4]:
#To check how many cores you have 
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
cores

1

#### DataLoad

In [5]:
#
path ="data/Read_Write_Validate_Datasets/"

students = spark.read.csv(path + "students.csv",inferSchema=True,header=True)

#inferSchema :: it is to let spark figure out datatype of dataframe. Sometimes it's accurate, sometimes, not. 

#### Preview of data

In [6]:
#Put limit and convert that into Pandas 
students.limit(4).toPandas()



Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44


#### How to read in Parquet files?

- Parquet data type is the most common in big data world. 
- it is the most compact file storage method even better than zip files and csvs. 

In [7]:
parquet = spark.read.parquet(path+"users1.parquet")

In [8]:
parquet.limit(4).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 16:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-04 02:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 10:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 09:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,


#### How to read several files at once?

- user files have been stored in each of the different files (user1.parquet, user2.parquet...)


In [9]:
#Walking a Directory Tree

import os
lst = []
for folder_name, subfolders, filenames in os.walk(path):
    for i in filenames:
        if i.startswith("users"):
            lst.append(folder_name+i)

users = spark.read.parquet(*lst)
users.limit(5).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 16:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-04 02:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 10:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 09:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 14:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


In [10]:
#choose 1,3file

user1_3 = spark.read.parquet(lst[0],lst[2])
user1_3.limit(3).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 16:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-04 02:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 10:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,


#### On AWS...

**Example)**

    bucket = "my_bucket"
    key1 = "partition_test/Table1/CREATED_YEAR=2015/*"
    key2 = "partition_test/Table1/CREATED_YEAR=2017/*"
    key3 = "partition_test/Table1/CREATED_YEAR=2018/*"

    test_df = spark.read.parquet('s3://'+bucket+'/'+key1,
                                 's3://'+bucket+'/'+key2,
                                 's3://'+bucket+'/'+key3)

    test_df.show(1)

###  Validating Data in PySpark

In [11]:
#Check out the schema (data time, nullable...)
students.printSchema() 

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)



In [12]:
#Check out columns
students.columns

['gender',
 'race/ethnicity',
 'parental level of education',
 'lunch',
 'test preparation course',
 'math score',
 'reading score',
 'writing score']

In [13]:
#Wanna see specific datatype ?
students.schema['math score'].dataType


IntegerType

In [14]:
#To get summary
students.select("math score","reading score").summary("count","min","max").show()

+-------+----------+-------------+
|summary|math score|reading score|
+-------+----------+-------------+
|  count|      1000|         1000|
|    min|         0|           17|
|    max|       100|          100|
+-------+----------+-------------+



### How to specify data types



In [15]:
from pyspark.sql.types import *

In [16]:
#To set schema for dataframe.
#StructField(fieldName,Type_of_field,If_Nullable)

data_schema = [StructField("name",StringType(),True),
              StructField("email",StringType(),True),
              StructField("city",StringType(),True),
              StructField("mac",StringType(),True),
              StructField("timestamp",DateType(),True),
              StructField("creditcard",StringType(),True)]

In [17]:
final_struc = StructType(fields=data_schema)

In [18]:
people = spark.read.json(path+"people.json",schema=final_struc)

In [19]:
people.limit(4).toPandas()

Unnamed: 0,name,email,city,mac,timestamp,creditcard
0,,,,,,
1,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,2015-04-25,1228-1221-1221-1431
2,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,2015-04-25,1228-1221-1221-1431
3,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,2015-04-25,


In [20]:
#Check out the schema again!
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



#### Later on, I'll have to see some examples where I can actually change data types after I've already read in a file.

- Case1, if dataFrame has so many fields that I don't wanna bother going through

- Case2, when you just want change one or two column types. 

### Writing in Data

In [21]:
students

DataFrame[gender: string, race/ethnicity: string, parental level of education: string, lunch: string, test preparation course: string, math score: int, reading score: int, writing score: int]

In [28]:
"""save mode
- "error"  :: when saving a DataFrame, if data already exists, an exception is expected to be thrown.
- "append" :: if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
- "overwrite" :: existing data is expected to be overwritten 
- "ignore" ::, the save operation is expected not to save the contents of the DataFrame and not to change the existing data.
"""


# students.write.mode("overwrite").csv('write_test.csv')

# user1_3.write.mode("overwrite").partitionBy("gender").parquet("part_parquet/")


'save mode\n- "error"  :: when saving a DataFrame, if data already exists, an exception is expected to be thrown.\n- "append" :: if data/table already exists, contents of the DataFrame are expected to be appended to existing data.\n- "overwrite" :: existing data is expected to be overwritten \n- "ignore" ::, the save operation is expected not to save the contents of the DataFrame and not to change the existing data.\n'

In [32]:
#custom dataframe

values =[("Pear",10),("Orange",10),("Peach",5)]
df = spark.createDataFrame(values,['fruit','quant'])
df.show()

+------+-----+
| fruit|quant|
+------+-----+
|  Pear|   10|
|Orange|   10|
| Peach|    5|
+------+-----+

