#### Implied Concepts 2020 - MIT License

# Dataframes in Pyspark

## class pyspark.sql.DataFrame(jdf, sql_ctx)
### A distributed collection of data grouped into named columns.
### A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various functions in SparkSession:

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[5]').appName('DataFrames Handson').getOrCreate()

ModuleNotFoundError: No module named 'pyspark'

In [None]:
#  Read CSV file with header and inferschema as true. It will load the data in df as a spark dataframe. We have already seen how data
#frames are different from RDD 
df = (
    spark
    .read
    .csv(
        'H:/Training/PySpark/data/2008csv/2008csv_Pieces/2008_1.csv'
        , header=True
        , inferSchema=True)
)

In [None]:
type(df)

# show(n=20, truncate=True, vertical=False)
## Prints the first n rows to the console.

### Parameters

### n – Number of rows to show.
### truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.
### vertical – If set to True, print output rows vertically (one line per column value).

In [None]:
#show method will display first 20 rows of dataframe

df.show()

# count()
## Returns the number of rows in this DataFrame.

In [None]:
df.count()

# columns
## Returns the columns in this DataFrame

In [None]:
df.columns

In [None]:
print((df.count(), len(df.columns)))

## Printschema() 
### method prints the schema of the dataframe. This is useful in table creation in SQL, spark dataframes etc

In [None]:
df.printSchema()

# DataFrame Transformations

## Caching data store it into internal RAM. THis speeds up the process of retreiving dataframes or RDD for repeated operations.
## There is also persist method which stores data into disk, and optionally in RAM.

In [None]:
df.cache().show()

# dropDuplicates(subset=None)
### Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.

### For a static batch DataFrame, it just drops duplicate rows. For a streaming DataFrame, it will keep all data across triggers as intermediate state to drop duplicates rows. You can use withWatermark() to limit how late the duplicate data can be and system will accordingly limit the state. In addition, too late data older than watermark will be dropped to avoid any possibility of duplicates.

### drop_duplicates() is an alias for dropDuplicates().

In [None]:
# Drop the duplicate rows from the dataframe

df.dropDuplicates().show()

# dropna(how='any', thresh=None, subset=None)
### Returns a new DataFrame omitting rows with null values. DataFrame.drop() and DataFrameNaFunctions.drop() are aliases of each other.

## Parameters
### how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.

### thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.

### subset – optional list of column names to consider.

In [None]:
# drops rows containing null string or NaN 

dropna_all = df.dropna(how='all')
print( df.count(), dropna_all.count())

In [None]:
dropna_any  = df.dropna('any')
print( df.count(), dropna_any.count())

In [None]:
    dropna_thres=  df.dropna( thresh=28) # it overwrites HOW
print( df.count(), dropna_thres.count())

In [None]:
dropna_subset  = df.dropna(how='any', subset=['Origin','Dest','Distance','LateAircraftDelay','SecurityDelay','SDelay'])
print( df.count(), dropna_subset.count())


# select(*cols)
### Projects a set of expressions and returns a new DataFrame.

### Parameters
### cols – list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataFrame.



In [None]:
columns=['Origin','Dest','Distance','LateAircraftDelay','SecurityDelay','SDelay']
df.select(*columns).show()

# distinct()
### Returns a new DataFrame containing the distinct rows in this DataFrame.

In [None]:
columns=['Origin','Dest','Distance','LateAircraftDelay','SecurityDelay','SDelay']


df.select(*columns).distinct().show()

## fill(value, subset=None)
### Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.

### Parameters
### value – int, long, float, string, bool or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, boolean, or string.

### subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.

In [None]:
df.show(1)

In [None]:
# replace the nas with specific string or number 

df.fillna(-1).show(1)

# filter(condition)
### Filters rows using the given condition.

## where() is an alias for filter().

### Parameters
### condition – a Column of types.BooleanType or a string of SQL expression.

In [None]:
#Filters rows using the given condition.

df.filter(df.FlightNum>500).show()

# describe(*cols)
### Computes basic statistics for numeric and string columns.
### This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.

In [15]:
# Computes basic statistics for numeric and string columns.
#This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.

df.select("Origin","Dest","Distance").describe().show()

+-------+------+-----+-----------------+
|summary|Origin| Dest|         Distance|
+-------+------+-----+-----------------+
|  count| 50000|50000|            50000|
|   mean|  null| null|         622.7933|
| stddev|  null| null|442.4061362129782|
|    min|   ABQ|  ABQ|              133|
|    max|   TUS|  TUS|             2363|
+-------+------+-----+-----------------+



In [16]:
#Computes specified statistics for numeric and string columns. Available statistics are: - count - mean - stddev - min - max 
#If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.

df.select("Origin","Dest","Distance").summary().show()



+-------+------+-----+-----------------+
|summary|Origin| Dest|         Distance|
+-------+------+-----+-----------------+
|  count| 50000|50000|            50000|
|   mean|  null| null|         622.7933|
| stddev|  null| null|442.4061362129782|
|    min|   ABQ|  ABQ|              133|
|    25%|  null| null|              319|
|    50%|  null| null|              446|
|    75%|  null| null|              838|
|    max|   TUS|  TUS|             2363|
+-------+------+-----+-----------------+



# groupBy(*cols)
## Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

### groupby() is an alias for groupBy().

### Parameters
### cols – list of columns to group by. Each element should be a column name (string) or an expression (Column).



In [18]:
#Groups the DataFrame using the specified columns, so we can run aggregation on them. 

df.groupBy('Dest').count().show()

+----+-----+
|Dest|count|
+----+-----+
| MSY|  514|
| GEG|  237|
| BUR|  893|
| PVD|  496|
| OAK| 2086|
| ORF|  193|
| CMH|  421|
|   B| 1263|
| SJC| 1154|
| BUF|  250|
| AUS|  712|
| RNO|  595|
| RSW|  166|
| TUL|  286|
| HRL|  168|
| AMA|  172|
| ISP|  417|
| MAF|  175|
| LAS| 3530|
| JAN|  132|
+----+-----+
only showing top 20 rows




# orderBy(*cols, **kwargs)
## Returns a new DataFrame sorted by the specified column(s).

## Parameters
### cols – list of Column or column names to sort by.
### ascending – boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.


In [21]:
#Method 1, Case 1
df.select("Origin","Dest","Distance").orderBy(["Origin","Dest","Distance"],ascending=False).show()

+------+----+--------+
|Origin|Dest|Distance|
+------+----+--------+
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
|   TUS| SAN|     367|
+------+----+--------+
only showing top 20 rows



In [22]:
#Method 1, Case 2
df.select("Origin","Dest","Distance").orderBy(["Origin","Dest","Distance"],ascending=[True, True,False]).show(100)

+------+----+--------+
|Origin|Dest|Distance|
+------+----+--------+
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| AMA|     277|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| BWI|    1670|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DAL|     580|
|   ABQ| DA

In [23]:
# method 2

df.select("Origin","Dest","Distance").orderBy(df.Dest.desc()).show()


+------+----+--------+
|Origin|Dest|Distance|
+------+----+--------+
|   LAS| TUS|     365|
|   LAS| TUS|     365|
|   LAS| TUS|     365|
|   LAS| TUS|     365|
|   LAS| TUS|     365|
|   LAS| TUS|     365|
|   LAX| TUS|     451|
|   LAX| TUS|     451|
|   LAX| TUS|     451|
|   LAX| TUS|     451|
|   LAX| TUS|     451|
|   LAX| TUS|     451|
|   MDW| TUS|    1440|
|   MDW| TUS|    1440|
|   OAK| TUS|     747|
|   SAN| TUS|     367|
|   SAN| TUS|     367|
|   SAN| TUS|     367|
|   SAN| TUS|     367|
|   ABQ| TUS|     321|
+------+----+--------+
only showing top 20 rows



In [24]:
df.select("Origin","Dest","Distance").orderBy(df.Dest.asc()).show()

+------+----+--------+
|Origin|Dest|Distance|
+------+----+--------+
|   DEN| ABQ|     349|
|   LAX| ABQ|     677|
|   DEN| ABQ|     349|
|   DAL| ABQ|     580|
|   DEN| ABQ|     349|
|   DAL| ABQ|     580|
|   HOU| ABQ|     759|
|   HOU| ABQ|     759|
|   DAL| ABQ|     580|
|   HOU| ABQ|     759|
|   DAL| ABQ|     580|
|   LAS| ABQ|     487|
|   DAL| ABQ|     580|
|   LAS| ABQ|     487|
|   DAL| ABQ|     580|
|   LAS| ABQ|     487|
|   ELP| ABQ|     223|
|   LAS| ABQ|     487|
|   DAL| ABQ|     580|
|   LAS| ABQ|     487|
+------+----+--------+
only showing top 20 rows



# registerDataFrameAsTable(df, tableName)
### Registers the given DataFrame as a temporary table in the catalog.
### Temporary tables exist only during the lifetime of this instance of SQLContext.

In [25]:
sqlContext.registerDataFrameAsTable(df, "table1")
df2 = sqlContext.sql("SELECT Dest AS Dest, Distance as Distance from table1")
df2.show()

+----+--------+
|Dest|Distance|
+----+--------+
| TPA|     810|
| TPA|     810|
| BWI|     515|
| BWI|     515|
| BWI|     515|
| JAX|     688|
| LAS|    1591|
| LAS|    1591|
| MCI|     451|
| MCI|     451|
| MCO|     828|
| MCO|     828|
| MDW|     162|
| MDW|     162|
| MDW|     162|
| MDW|     162|
| PHX|    1489|
| PHX|    1489|
| TPA|     838|
| BWI|     220|
+----+--------+
only showing top 20 rows



# toPandas()
### Returns the contents of this DataFrame as Pandas pandas.DataFrame.

### This is only available if Pandas is installed and available.

In [26]:
df.toPandas()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,SDelay,SecurityDelay,LateAircraftDelay
0,2008,1,3,4,2003.0,1955,2211.0,2225,WN,335,...,4.0,8.0,0,,0,,,,,
1,2008,1,3,4,754.0,735,1002.0,1000,WN,3231,...,5.0,10.0,0,,0,,,,,
2,2008,1,3,4,628.0,620,804.0,750,WN,448,...,3.0,17.0,0,,0,,,,,
3,2008,1,3,4,926.0,930,1054.0,1100,WN,1746,...,3.0,7.0,0,,0,,,,,
4,2008,1,3,4,1829.0,1755,1959.0,1925,WN,3920,...,3.0,10.0,0,,0,2.0,0.0,0.0,0.0,32.0
5,2008,1,3,4,1940.0,1915,2121.0,2110,WN,378,...,4.0,10.0,0,,0,,,,,
6,2008,1,3,4,1937.0,1830,2037.0,1940,WN,509,...,3.0,7.0,0,,0,10.0,0.0,0.0,0.0,47.0
7,2008,1,3,4,1039.0,1040,1132.0,1150,WN,535,...,7.0,7.0,0,,0,,,,,
8,2008,1,3,4,617.0,615,652.0,650,WN,11,...,6.0,19.0,0,,0,,,,,
9,2008,1,3,4,1620.0,1620,1639.0,1655,WN,810,...,3.0,6.0,0,,0,,,,,


## DataFrame.write()
### Interface used to write a DataFrame to external storage systems

In [27]:
df.write.mode('overwrite').csv('dataframes.csv')