## Reading different types of Data, Partitions, Parellization
##### DATAFRAME API IS PREFERRED against RDD API, as it is much faster. Datasets API is not avaliable in python.

#### 1. Dataframes are immutable ; with every transformation new dataset is created

#### 2. Spark datasets are represented as a list of entries.
       This list is broken into partitions stored on a different machines. 
       Each partition holds a unique subset of the entries in the list. 
       Spark call these datasets "Resilient Distributed Datasets" (RDDs).
#### 3. At low level, everything is implemented as RDDs

#### 4. DataFrames are ultimately represented as RDDs, with additional meta-data.

#### 5.When you create a DataFrame, this collection is going to be parallelized

#### 6.Spark DataFrames schemas are defined as a collection of typed columns. The entire schema is stored as a StructType and individual columns are stored as StructFields.

## Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
####  Hadoop format 
1. CSV Files
2. Text Files
3. JSON Records
4. Avro Files
5. Sequence Files
6. ORC Files
7. Parquet Files
8. XML files

In [None]:
### There are 3 different ways to create dataframes in pyspark
    1. Read from data directly to CreateDataFrame
    2. Create RDD and pass it to CreateDataFrame
    3. Create pandas df and pass it to CreateDataFrame

Differences in 1, 2 & 3
Numofpartitions:In method 1, it is 1, In method 2, it is 2, In method 3, it is 8
Method1 : Raw Data => Spark DataFrame #Best Method , spark takes less operations and less time to convert into internal mapRDD 
Method2 : Raw Data => RDD => Spark DataFrame
Method3 : Raw Data => PandasDF => Spark DataFarme

In [4]:
from pyspark.sql import SparkSession
import pandas as pd

In [5]:
spark = SparkSession.Builder().appName("fileformats").getOrCreate()

### 1. CSV files

In [6]:
#Method 1
pandas_df = pd.read_csv("Data/Employee_Statistics.csv")

In [7]:
pandas_df.dtypes

enrollee_id                 int64
city                       object
city_development_index    float64
gender                     object
relevent_experience        object
enrolled_university        object
education_level            object
major_discipline           object
experience                 object
company_size               object
company_type               object
last_new_job               object
training_hours              int64
target                      int64
dtype: object

In [8]:
from pyspark.sql.types import *

    DataType
            ArrayType
            MapType
            NullType
            StructField
            StructType
    AtomicType(DataType)
        BinaryType
        BooleanType
        DateType
        StringType
        TimestampType
    FractionalType(NumericType)
        DecimalType
        DoubleType
        FloatType
    IntegralType(NumericType)
        ByteType
        IntegerType
        LongType
        ShortType

In [9]:
## We need to define Schema for pandas dataframe, 
## because Spark DataFrame can't infer spark dataframe schema from pandas dataframe
## It may throw an error.

## df= spark.createDataFrame(panda_df)

## One of the error example when,I run above code line without giving schema 

##TypeError: field company_size: 
##        Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>


In [10]:
schema = StructType([StructField("enrollee_id", IntegerType(), False)\
                    ,StructField("city", StringType(), True)\
                    ,StructField("city_development_index", FloatType(), True)\
                    ,StructField('gender', StringType(), True)\
                    ,StructField('relevent_experience', StringType(), True)\
                    ,StructField('enrolled_university', StringType(), True)\
                    ,StructField('education_level', StringType(), True)\
                    ,StructField('major_discipline', StringType(), True)\
                    ,StructField('experience', StringType(), True)\
                    ,StructField('company_size', StringType(), True)\
                    ,StructField('company_type', StringType(), True)\
                    ,StructField('last_new_job', StringType(), True)\
                    ,StructField('training_hours', IntegerType(), True),StructField('target', IntegerType(), True)])

In [11]:
df1= spark.createDataFrame(pandas_df, schema=schema)

In [None]:
#Method 2, Data is loaded as a spark dataframe( Not as RDD, not as Pandas Dataframe)
df2= (spark.read.format("csv").options(header="true").load("Data/Employee_Statistics.csv"))

In [None]:
## Let us compare method 2 & method 1
## In Method 2: whole data is loaded as string
## Method 1 give us more control over describing data types for columns

In [None]:
df2.printSchema()

In [None]:
df1.rdd.getNumPartitions()

In [None]:
df2.rdd.getNumPartitions()

In [None]:
## Method 1 give me default data partitions done while dataframe creation, but partitions are not created in method 2

In [None]:
df1.take(1)

In [None]:
### Method 3
RDD_csv = spark.sparkContext.textFile("Data/Employee_Statistics.csv")

In [None]:
df1= spark.createDataFrame(RDD_csv, schema=schema)

In [None]:
df1.rdd.getNumPartitions()

## 2.Text File
##### It will be similar to csv file. Let us see if we can spot any differences


In [None]:
## A create DataFRAME can take only three types of data, a list, pandas Dataframe or RDD
## We have three options while reading data from external source
## A pure text file(such as book page) can be read as pandas data frame or pure RDD or Spark DataFrame
## Use Case: for NLP problems

In [None]:
## an RDD of :class:`Row`/:class:`tuple`/:class:`list`/:class:`dict`,:class:`list`,

In [None]:
RDD_list = spark.sparkContext.textFile("Data/bookpage.txt")

In [None]:
## Let us check this RDD_text looks like
RDD_list.take(10)

In [None]:
## Let us convert this RDD into dataframe

## df3 = spark.createDataFrame(RDD_list)

## when I run above line of code, it throws an error

## TypeError: Can not infer schema for type: <class 'str'>


In [None]:
df3 =spark.createDataFrame(RDD_list, StringType())

In [None]:
df3.take(1)

In [None]:
df3.rdd.getNumPartitions()

In [None]:
## Let us do with Method2: Pandasdf

In [None]:
pandas_df = pd.read_table("Data/bookpage.txt", header=None, names=['PlainTextField'])

In [None]:
pandas_df.head(10)

In [None]:
df4 = spark.createDataFrame(pandas_df)

In [None]:
df4.take(1)

In [None]:
df4.rdd.getNumPartitions()

In [None]:
#Method 3: Loading data into Spark Dataframe 
Spark_Df =spark.read.text("Data/bookpage.txt")

In [None]:
Spark_Df.rdd.getNumPartitions()

In [None]:
Spark_Df.take(1)

## 3. JSON records.

In [None]:
#Method 1: Making Spark Dataframe by reading directly from Json file

In [None]:
df5=spark.read.json("Data\sparkify_log_small.json")

In [None]:
df5.rdd.getNumPartitions()

In [None]:
df5.printSchema()

In [None]:
# Using Pandas Dataframe
pandas_df = pd.read_json("Data/sparkify_log_small.json", lines=True)
pandas_df.dtypes

In [None]:
#when I run following line, I got type error, we have to describe schema to convert each pandas type to spark type.
#df6 = spark.createDataFrame(pandas_df)

#TypeError: field artist: 
        #Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

In [None]:
# To save effort writing schema, i will go with create dierctly spark dataframe by reading raw data
# But I have to make sure, I get parallelism as, spark offers for reading pandas dataframes.

In [None]:
df5=spark.read.json("Data\sparkify_log_small.json")

In [None]:
## A DataFrame is already optimized for parallel execution, we need not to give it- number of partitions##
## DataFrame is a distributed data structure. It is neither required nor possible to parallelize it. (source:stackoverflow)

##### Need to understand partitions in more detail:
##### Spark uses Hadoop InputFilFormat under the hood, it will be reading partitions by input block (source:stackoverflow)
##### Paritions are logical divisions of data from RDD (as input spilts are created from Hadoop blocks ). 
##### Hadoop default block size is 128MB, that is default partition size
##### Spark uses map-reduce API to partition the data
##### Slice Size = Maths.Max(minSize, Maths.min(maxSize, BlockSize)), we can alter max and min sizes of partitions
##### data is divided into n number of partitions of Slice size
##### By default blocksize is 128MB

In [None]:
## When the number of partitions is between 100 and 10K partitions
## based on the size of the cluster and data, the lower and upper bound should be determined.

## The lower bound for spark partitions is determined by 2 X number of cores in the cluster available to application.
## Determining the upper bound for partitions in Spark, 
## the task should take 100+ ms time to execute. If it takes less time, 
## then the partitioned data might be too small or the application might be spending extra time in scheduling tasks.

In [None]:
## We can do repartition based on use cases, 
## but it involved shuffling, which  add additional network cost, so we need to careful about it

## 4. AVRO file

Avro, a schema-based serialization technique.
Avro serializes the data which has a built-in schema. 
Avro serializes the data into a compact binary format, which can be deserialized by any application.
Avro uses JSON format to declare the data structures.
Resulting serialized data is lesser in size. Schema is stored along with the Avro data in a file for any further processing.
We have from Avro and to Avro functions in  pyspark.sql.avro.functions

In [None]:
## Conversting Spark Dataframe into Avro file

from pyspark.sql.avro.functions import to_avro

##  to_avro Converts a column into binary of avro format., This is useful in case of Kafka
## https://spark.apache.org/docs/3.0.0-preview/api/python/_modules/pyspark/sql/avro/functions.html

In [None]:
from pyspark.sql import Row
from pyspark.sql.avro.functions import to_avro
data = ['SPADES']
df = spark.createDataFrame(data, "string")
df.select(to_avro(df.value).alias("suite")).collect()

In [None]:
## Writing spark dataframe to Avro
df1.write.format("avro").save("Data/test3.avro")

## By default, avro files are written in same number of partitions as a data frame
## if dataframe has 2 partitions, avro will also have 2, if data frame has 8, avro will have 8

In [None]:
## Converting avro file to Spark Dataframe
df6 =spark.read.format("avro").load("Data/test3.avro")

In [None]:
df6.take(1)

In [None]:
## three types of compressions, when storing data on disk space
## https://spark.apache.org/docs/3.0.0/sql-data-sources-avro.html

## Compression codec used in writing of AVRO files.
## Supported codecs: uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.
df1.write.format("avro").option("compression", "deflate").save("Data/test4.avro")

In [None]:
## PartitionBy in Arvo, we can partition data based on any column for saving on disk

df6.write.partitionBy("major_discipline").format("avro").save("custom_partitioned.avro")


<img src="Data/Capture.PNG">

In [None]:
## Schema in Avro
## Avro schemas are usually defined with .avsc extension and the format of the file is in JSON.
## We can provide this file using option() while reading an Avro file. 
## The schema provides the structure of the Avro file with field names and it’s data types.

# 5.SEQUQENCE FILES

Sequence files are one of the Apache Hadoop specific file formats which stores data in serialized key-value pair. Serialized in the sense: Stream of bytes.Hadoop Sequence file is a flat file structure which consists of serialized/binary key-value pairs. This is the same format in which the data is stored internally during the processing of the MapReduce tasks.

What is the purpose of sequence file?

1) To enable/store/process binary data

2) The other objective of using SequenceFile is to pack many small files into a single large SequenceFile for the
MapReduce computation since the design of Hadoop prefer large files. Sequence file also work well as containers for
smaller files. HDFS and MapReduce are optimized for large files, so packing small files into a sequencefile makes storing
and processing the smaller files more efficient.

In [None]:
## How to create a sequence file and writing a squence file, reading a sequence file . Unserstand structure
## Sequence File => Spark DataFrame => Sequence File # No DataFrameAPI for sequence file 
## We can use RDD API sc.sequencefile to read sequence file and RDD.Saveassequence file

In [None]:
RDD_csv = spark.sparkContext.textFile("Data/Employee_Statistics.csv")

In [None]:
## While running this code:
## RDD_csv.saveAsSequenceFile("Data\squenceTest")
## I got error :
# Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile.
# : org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used

## So I need to convert my data into key-value pairs to save a sequence file.

In [None]:
## If we think us ecase of sequence files, it is collectionn of small files
RDD_text = spark.sparkContext.textFile("Datasets_seq")

In [None]:
pairRDD = RDD_text.map(lambda x:(None,x))

In [None]:
pairRDD.saveAsSequenceFile("dataseq_1")

In [None]:
#Reading sequence file
RDD_seq = spark.sparkContext.sequenceFile("dataseq_1")

# 6. ORC Files

1. ORC stands for Optimized Row Columnar which means it can store data in an optimized way than the other file formats. 

2. ORC reduces the size of the original data up to 75%. As a result the speed of data processing also increases and shows better performance than Text, Sequence and RC file formats. 

3. An ORC file contains rows data in groups called as Stripes along with a file footer.

4. ORC File format provides very efficient way to store relational data.

5. By using ORC File format we can reduce the size of original data up to 75%.( source: Nxt Gen)

6. ORC takes less time to access the data and ORC takes Less space to store data. 

7. However, the ORC file increases CPU overhead by increasing the time it takes to decompress the relational data

(Source: HortWorks Data Summit 2017)

<img src="Data/DataWorksSummit_ORC.PNG">

In [13]:
# Let us create ORC file from Spark Datafrmae.
df1.write.format("orc").save("Data\orcfile")

In [15]:
df = spark.read.format("orc").load("Data\orcfile")

# 7. Parquet Files

When querying, in this columnar storage you can skip over the non-relevant data very quickly,support advanced nested data structures.The layout of Parquet data files is optimized for queries that process large volumes of data, in the gigabyte range 
Most of the cloud companies, charge based on the amount of data scanned per query and amount of data stored. Parquet can be cost-saviour (source :Databricks)

<img src="Data/parquat_dist.PNG">

In [20]:
# Let us create Parquat file from Spark Datafrmae.
df1.write.format("parquet").save("Data\parfile")

In [21]:
df = spark.read.format("parquet").load("Data\parfile")

## Which one is best ? ORC, PARQUAT OR AVRO?

#### 1. GENERAL RULE 

In general, if the data is wide, has a large number of attributes and is write-heavy, 
then a row-based approach may be best. (**AVRO**)
If the data is narrower, 
has a fewer number of attributes, and is read-heavy, then a column-based approach may be best
(**PARQUET OR ORC** depend on platform you use)

###### 2. Read speed : ORC>AVRO>Parquat>JSON (depend on use cases)
   Grabage Collection: Parquat>ORC>Avro

# 8. XML DATA