# DataFrame 

- What is spark dataframe?

- Why is it different than the RDDs?

In [1]:
# Initialize the spark on jupyter notebook

import findspark
findspark.init()


# To create the dataframe in pyspark 

from pyspark.sql import SparkSession

In [2]:
# Initializing the sparksesstion 

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
# Reading the csv file in pyspark 

df = spark.read.csv('country_data.csv')

In [4]:
# Show shows the first few entries in the dataframe 
# Notice that the column names are not good 

df.show(5)

+----+-----------+----+-----------------+
| _c0|        _c1| _c2|              _c3|
+----+-----------+----+-----------------+
|null|    Country|Year|              GDP|
|   0|Afghanistan|2015|584.2592099999999|
|   1|Afghanistan|2014|       612.696514|
|   2|Afghanistan|2013|       631.744976|
|   3|Afghanistan|2012|          669.959|
+----+-----------+----+-----------------+
only showing top 5 rows



In [5]:
# What objects the dataframe contains?
# Year and GDP are string. Sometimes they may cause problem. 

df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

schema = StructType([StructField('_c0', IntegerType(), True), 
                      StructField('Country', StringType(), True), 
                      StructField('Year', IntegerType(), True), 
                      StructField('GDP', FloatType(), True)
                     ])

In [7]:
# Redefining the dataframe with correct column labels 

df = spark.read.csv('country_data.csv', header=True, schema=schema)
df.show(5)

+---+-----------+----+---------+
|_c0|    Country|Year|      GDP|
+---+-----------+----+---------+
|  0|Afghanistan|2015| 584.2592|
|  1|Afghanistan|2014|612.69653|
|  2|Afghanistan|2013|  631.745|
|  3|Afghanistan|2012|  669.959|
|  4|Afghanistan|2011| 63.53723|
+---+-----------+----+---------+
only showing top 5 rows



In [8]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- GDP: float (nullable = true)



In [9]:
# To rename the columns 

df.toDF('new_name_1', 'new_name_2', 'new_name_3', 'new_name_4').show(5)

+----------+-----------+----------+----------+
|new_name_1| new_name_2|new_name_3|new_name_4|
+----------+-----------+----------+----------+
|         0|Afghanistan|      2015|  584.2592|
|         1|Afghanistan|      2014| 612.69653|
|         2|Afghanistan|      2013|   631.745|
|         3|Afghanistan|      2012|   669.959|
|         4|Afghanistan|      2011|  63.53723|
+----------+-----------+----------+----------+
only showing top 5 rows



In [10]:
# In pandas head() would do similar thing. It is slighly different here. 
# This method is not recommended for the large dataset. 

df.head(5)

[Row(_c0=0, Country='Afghanistan', Year=2015, GDP=584.2592163085938),
 Row(_c0=1, Country='Afghanistan', Year=2014, GDP=612.696533203125),
 Row(_c0=2, Country='Afghanistan', Year=2013, GDP=631.7449951171875),
 Row(_c0=3, Country='Afghanistan', Year=2012, GDP=669.958984375),
 Row(_c0=4, Country='Afghanistan', Year=2011, GDP=63.5372314453125)]

In [11]:
# There is also take doing the similar thing 

df.take(3)

[Row(_c0=0, Country='Afghanistan', Year=2015, GDP=584.2592163085938),
 Row(_c0=1, Country='Afghanistan', Year=2014, GDP=612.696533203125),
 Row(_c0=2, Country='Afghanistan', Year=2013, GDP=631.7449951171875)]

In [12]:
# Selecting a column from the dataframe 

df.select('Country').show(5)

+-----------+
|    Country|
+-----------+
|Afghanistan|
|Afghanistan|
|Afghanistan|
|Afghanistan|
|Afghanistan|
+-----------+
only showing top 5 rows



In [13]:
# Selecting two columns from the dataframe 

df.select('Country', 'GDP').show(5)

+-----------+---------+
|    Country|      GDP|
+-----------+---------+
|Afghanistan| 584.2592|
|Afghanistan|612.69653|
|Afghanistan|  631.745|
|Afghanistan|  669.959|
|Afghanistan| 63.53723|
+-----------+---------+
only showing top 5 rows



In [14]:
# Droping unwanted row from datafrmae

df.drop('_c0').show(5)

+-----------+----+---------+
|    Country|Year|      GDP|
+-----------+----+---------+
|Afghanistan|2015| 584.2592|
|Afghanistan|2014|612.69653|
|Afghanistan|2013|  631.745|
|Afghanistan|2012|  669.959|
|Afghanistan|2011| 63.53723|
+-----------+----+---------+
only showing top 5 rows



In [15]:
# Convert spark dataframe to Pandas dataframe

df.toPandas().head()

Unnamed: 0,_c0,Country,Year,GDP
0,0,Afghanistan,2015,584.259216
1,1,Afghanistan,2014,612.696533
2,2,Afghanistan,2013,631.744995
3,3,Afghanistan,2012,669.958984
4,4,Afghanistan,2011,63.537231


In [16]:
# By default the data is orderd by country. 
# Here is a method to order by different column. 

df.orderBy('GDP', ascending=False).show(5)

+----+----------+----+---------+
| _c0|   Country|Year|      GDP|
+----+----------+----+---------+
|1539|Luxembourg|2014|119172.74|
|1542|Luxembourg|2011|115761.58|
|1545|Luxembourg|2008|114293.84|
|1540|Luxembourg|2013|113751.85|
|1547|Luxembourg|2006| 89739.71|
+----+----------+----+---------+
only showing top 5 rows



In [17]:
# Descriptive statistics of the datafrmae

df.describe().show()

+-------+-----------------+-----------+------------------+------------------+
|summary|              _c0|    Country|              Year|               GDP|
+-------+-----------------+-----------+------------------+------------------+
|  count|             2938|       2938|              2938|              2490|
|   mean|           1468.5|       null|2007.5187202178352| 7483.158462069025|
| stddev|848.2718707269896|       null| 4.613840940258099|14270.169327273006|
|    min|                0|Afghanistan|              2000|           1.68135|
|    max|             2937|   Zimbabwe|              2015|         119172.74|
+-------+-----------------+-----------+------------------+------------------+



In [18]:
# Descriptive statistics of only selected columns

df.describe('Year', 'GDP').show()

+-------+------------------+------------------+
|summary|              Year|               GDP|
+-------+------------------+------------------+
|  count|              2938|              2490|
|   mean|2007.5187202178352| 7483.158462069025|
| stddev| 4.613840940258099|14270.169327273006|
|    min|              2000|           1.68135|
|    max|              2015|         119172.74|
+-------+------------------+------------------+



In [19]:
# If we only need one descriptive measure this is the way 

df.agg({'Year':'min'}).show()

+---------+
|min(Year)|
+---------+
|     2000|
+---------+



In [20]:
# If we want two statistical measure simultaneously we can not 
# do it in this way. We will see how to do that sortly. 

df.agg({'GDP':'mean'}).show()

+-----------------+
|         avg(GDP)|
+-----------------+
|7483.158462069025|
+-----------------+



In [21]:
# summary() is similar to the describe() but it is more versatile 
# and it has more functionality as we see below. 

df.summary().show()

+-------+-----------------+-----------+------------------+------------------+
|summary|              _c0|    Country|              Year|               GDP|
+-------+-----------------+-----------+------------------+------------------+
|  count|             2938|       2938|              2938|              2490|
|   mean|           1468.5|       null|2007.5187202178352| 7483.158462069025|
| stddev|848.2718707269896|       null| 4.613840940258099|14270.169327273006|
|    min|                0|Afghanistan|              2000|           1.68135|
|    25%|              734|       null|              2004|         463.85263|
|    50%|             1468|       null|              2008|         1764.9739|
|    75%|             2203|       null|              2012|          5918.199|
|    max|             2937|   Zimbabwe|              2015|         119172.74|
+-------+-----------------+-----------+------------------+------------------+



In [22]:
# We can choose mean and median only. 

df.summary('mean', '50%').show()

+-------+------+-------+------------------+-----------------+
|summary|   _c0|Country|              Year|              GDP|
+-------+------+-------+------------------+-----------------+
|   mean|1468.5|   null|2007.5187202178352|7483.158462069025|
|    50%|  1468|   null|              2008|        1764.9739|
+-------+------+-------+------------------+-----------------+



In [23]:
# We can choose the column which we want to analyze 

df.select('GDP', 'Year').summary().show()

+-------+------------------+------------------+
|summary|               GDP|              Year|
+-------+------------------+------------------+
|  count|              2490|              2938|
|   mean| 7483.158462069025|2007.5187202178352|
| stddev|14270.169327273006| 4.613840940258099|
|    min|           1.68135|              2000|
|    25%|         463.85263|              2004|
|    50%|         1764.9739|              2008|
|    75%|          5918.199|              2012|
|    max|         119172.74|              2015|
+-------+------------------+------------------+



In [24]:
# And finally we can choose both columns and measures 

df.select('GDP').summary('mean', 'stddev').show()

+-------+------------------+
|summary|               GDP|
+-------+------------------+
|   mean| 7483.158462069025|
| stddev|14270.169327273006|
+-------+------------------+



In [25]:
# Here is another way to achieve the same thing. 
# But summary() is way to go because of its simplicity. 

from pyspark.sql.functions import avg, stddev

df.agg(avg('GDP'), stddev('GDP')).show()

+-----------------+------------------+
|         avg(GDP)|  stddev_samp(GDP)|
+-----------------+------------------+
|7483.158462069025|14270.169327273006|
+-----------------+------------------+



In [26]:
# This is to calculate the percentiles from the dataframe
# First entry is the column name from which these calculations are made. 
# Second is the list of percentiles we calculate. 0.5 means 50th and so on. 
# Last entry means accuracy. 0.01 means accurate upto two places in decimal. 

df.approxQuantile('GDP', [0.25, 0.5, 0.75], 0.01)

[452.46319580078125, 1757.177978515625, 5734.443359375]

In [27]:
# This calculates the frequency and its repeatition 

df.freqItems(['GDP'], 0.01).show()

+--------------------+
|       GDP_freqItems|
+--------------------+
|[39439.82, 36161....|
+--------------------+



In [28]:
# This claculates the correlation between two columns

df.corr('GDP', 'Year')

0.09047997064705468

In [29]:
# To take random sample from the dataframe. 
# The first boolean is for weather or not to make replacement in smapling.
# Second flooat is to indicate what fraction of populations to take in sample. 

df_sample = df.sample(False, 0.01)
df_sample.show(4)

+---+-------------+----+---------+
|_c0|      Country|Year|      GDP|
+---+-------------+----+---------+
|129|      Austria|2014| 51322.64|
|270|       Belize|2001|3419.2756|
|288|       Bhutan|2015|2613.6453|
|445|Côte d'Ivoire|2002|     null|
+---+-------------+----+---------+
only showing top 4 rows



In [30]:
# To count the total rows in the dataframe.
# We see that df_sample is 0.01 times df as indicated above.  

print(df.count())
print(df_sample.count())

2938
33


In [31]:
# df.groupBy('Country').pivot('Year').sum().show(4)

In [32]:
# distinct() removes the repeatition in the selection 

countries = df.select('Country').distinct()
countries.show(10)

+--------------------+
|             Country|
+--------------------+
|       Côte d'Ivoire|
|                Chad|
|Micronesia (Feder...|
|            Paraguay|
|               Yemen|
|             Senegal|
|          Cabo Verde|
|              Sweden|
|            Kiribati|
|   Republic of Korea|
+--------------------+
only showing top 10 rows



In [33]:
# We can sort the dataframe by selected column name 

countries.sort('Country').show(10)

+-------------------+
|            Country|
+-------------------+
|        Afghanistan|
|            Albania|
|            Algeria|
|             Angola|
|Antigua and Barbuda|
|          Argentina|
|            Armenia|
|          Australia|
|            Austria|
|         Azerbaijan|
+-------------------+
only showing top 10 rows



In [34]:
# Now we can count the total number of countries 

countries.count()

193

In [35]:
# We can group the dataframe by entries of the selected columns 
# Usually we want to perfrom some descriptive measurement after 
# grouping them together. 

df.groupBy('Country').count().show(4)

+--------------------+-----+
|             Country|count|
+--------------------+-----+
|       Côte d'Ivoire|   16|
|                Chad|   16|
|Micronesia (Feder...|   16|
|            Paraguay|   16|
+--------------------+-----+
only showing top 4 rows



In [36]:
# We can do multiple measurement after grouping 

df.groupBy('Country').mean('GDP', 'Year').show(5)

+--------------------+------------------+---------+
|             Country|          avg(GDP)|avg(Year)|
+--------------------+------------------+---------+
|       Côte d'Ivoire|              null|   2007.5|
|                Chad| 484.7916989326477|   2007.5|
|Micronesia (Feder...|              null|   2007.5|
|            Paraguay|1983.4043321609497|   2007.5|
|               Yemen|              null|   2007.5|
+--------------------+------------------+---------+
only showing top 5 rows



In [37]:
# We can also group the dataframe by multiple columns 

df.groupBy('Country', 'Year').count().show(4)

+----------+----+-----+
|   Country|Year|count|
+----------+----+-----+
| Argentina|2010|    1|
|Azerbaijan|2012|    1|
|   Bahrain|2014|    1|
|  Colombia|2014|    1|
+----------+----+-----+
only showing top 4 rows



In [38]:
# Filter is to make selection from the dataframe

df.filter(df['Country']=='Nepal').show()

+----+-------+----+---------+
| _c0|Country|Year|      GDP|
+----+-------+----+---------+
|1813|  Nepal|2015| 743.7653|
|1814|  Nepal|2014|  76.2387|
|1815|  Nepal|2013|688.61725|
|1816|  Nepal|2012| 681.7926|
|1817|  Nepal|2011| 692.1167|
|1818|  Nepal|2010|592.18353|
|1819|  Nepal|2009|  48.7299|
|1820|  Nepal|2008|473.84445|
|1821|  Nepal|2007|393.88434|
|1822|  Nepal|2006|348.63144|
|1823|  Nepal|2005|317.89197|
|1824|  Nepal|2004| 287.4156|
|1825|  Nepal|2003|253.72412|
|1826|  Nepal|2002|246.37563|
|1827|  Nepal|2001|248.61835|
|1828|  Nepal|2000|231.42554|
+----+-------+----+---------+



In [39]:
# Here is another selection using filter. 

df.filter(df['GDP'] < 10).show()

+----+-----------+----+---------+
| _c0|    Country|Year|      GDP|
+----+-----------+----+---------+
| 205| Bangladesh|2002|4.6135745|
| 479|   Cambodia|2000| 3.685949|
|1672|  Mauritius|2010| 8.376432|
|2036|Philippines|2002|  1.68135|
|2158|     Rwanda|2008|5.6687264|
+----+-----------+----+---------+



In [40]:
# We can use descriptive measure on the filtered data

df.filter(df['Year']==2000).describe().show()

+-------+------------------+-----------+------+------------------+
|summary|               _c0|    Country|  Year|               GDP|
+-------+------------------+-----------+------+------------------+
|  count|               183|        183|   183|               154|
|   mean|1475.0273224043715|       null|2000.0|4708.5152019757725|
| stddev| 850.9962321664372|       null|   0.0|  9181.57371052946|
|    min|                15|Afghanistan|  2000|          3.685949|
|    max|              2937|   Zimbabwe|  2000|         48735.996|
+-------+------------------+-----------+------+------------------+



In [41]:
# Here is a more complex one

df.filter(df['Year']==2000).select('GDP').summary().show()

+-------+------------------+
|summary|               GDP|
+-------+------------------+
|  count|               154|
|   mean|4708.5152019757725|
| stddev|  9181.57371052946|
|    min|          3.685949|
|    25%|         263.11246|
|    50%|         796.79376|
|    75%|         3364.4238|
|    max|         48735.996|
+-------+------------------+



In [42]:
# Even more complex 

df.filter(df['Year']==2000).select('GDP').summary('mean', 'stddev').show()

+-------+------------------+
|summary|               GDP|
+-------+------------------+
|   mean|4708.5152019757725|
| stddev|  9181.57371052946|
+-------+------------------+



In [43]:
# where is same as filter 

df.where(df['Year']==2000).summary('count').show()

+-------+---+-------+----+---+
|summary|_c0|Country|Year|GDP|
+-------+---+-------+----+---+
|  count|183|    183| 183|154|
+-------+---+-------+----+---+



In [44]:
# We can do simple mathematics on the column of the dataframe 

df.select('Country', (df['Year'] - 2000)/100).show(5)

+-----------+---------------------+
|    Country|((Year - 2000) / 100)|
+-----------+---------------------+
|Afghanistan|                 0.15|
|Afghanistan|                 0.14|
|Afghanistan|                 0.13|
|Afghanistan|                 0.12|
|Afghanistan|                 0.11|
+-----------+---------------------+
only showing top 5 rows



In [45]:
# We can also do operations between two columns 

df.select('Country', df['Year']/df['GDP']).show(4)

+-----------+------------------+
|    Country|      (Year / GDP)|
+-----------+------------------+
|Afghanistan|3.4488116639921658|
|Afghanistan|3.2871085290314612|
|Afghanistan|3.1864122637435255|
|Afghanistan| 3.003168920672033|
+-----------+------------------+
only showing top 4 rows



In [46]:
# If we want to create new column with these operations 

df.withColumn('new_column', df['Year']/df['GDP']).show(4)

+---+-----------+----+---------+------------------+
|_c0|    Country|Year|      GDP|        new_column|
+---+-----------+----+---------+------------------+
|  0|Afghanistan|2015| 584.2592|3.4488116639921658|
|  1|Afghanistan|2014|612.69653|3.2871085290314612|
|  2|Afghanistan|2013|  631.745|3.1864122637435255|
|  3|Afghanistan|2012|  669.959| 3.003168920672033|
+---+-----------+----+---------+------------------+
only showing top 4 rows



In [None]:
# Udf provides an option to apply a function in the dataframe.

from pyspark.sql.functions import udf

In [76]:
# First we create a function 

def is_developed(gdp):
    if gdp < 450:
        return 'Undeveloped'
    elif 450 < gdp < 6000:
        return 'Developing'
    else:
        return 'Developed'

In [85]:
# Now we create udf using the function we defined 

poor_udf = udf(is_developed, StringType())


# We create a new column using the udf we just defined 

df.withColumn('is_developed?', poor_udf(df['GDP'])).show(5)

+---+-----------+----+---------+-------------+
|_c0|    Country|Year|      GDP|is_developed?|
+---+-----------+----+---------+-------------+
|  0|Afghanistan|2015| 584.2592|   Developing|
|  1|Afghanistan|2014|612.69653|   Developing|
|  2|Afghanistan|2013|  631.745|   Developing|
|  3|Afghanistan|2012|  669.959|   Developing|
|  4|Afghanistan|2011| 63.53723|  Undeveloped|
+---+-----------+----+---------+-------------+
only showing top 5 rows



In [86]:
def how_long(year):
    if year < 2013:
        return 'Long ago'
    else:
        return 'Recent'
    
year_udf = udf(how_long, StringType())
df.withColumn('how_long?', year_udf(df['Year'])).show(5)

+---+-----------+----+---------+---------+
|_c0|    Country|Year|      GDP|how_long?|
+---+-----------+----+---------+---------+
|  0|Afghanistan|2015| 584.2592|   Recent|
|  1|Afghanistan|2014|612.69653|   Recent|
|  2|Afghanistan|2013|  631.745|   Recent|
|  3|Afghanistan|2012|  669.959| Long ago|
|  4|Afghanistan|2011| 63.53723| Long ago|
+---+-----------+----+---------+---------+
only showing top 5 rows



In [47]:
# Writing a fucntion that takes a data frame in input and output 

def func(df):
    return df.select('GDP')

func(df).show(4)

+---------+
|      GDP|
+---------+
| 584.2592|
|612.69653|
|  631.745|
|  669.959|
+---------+
only showing top 4 rows



In [48]:
# Transform transforms the dataframe as indicated by the fucntion 

df.transform(func).show(4)

+---------+
|      GDP|
+---------+
| 584.2592|
|612.69653|
|  631.745|
|  669.959|
+---------+
only showing top 4 rows



In [49]:
# Missing values in the dataframe 

for col_name in df.columns:
    print(col_name, '\t\t\t', df.filter(df[col_name].isNull()).count())

_c0 			 0
Country 			 0
Year 			 0
GDP 			 448


In [50]:
# Filling the missing values 

df_nonull = df.fillna({'GDP':7500})

for col_name in df_nonull.columns:
    print(col_name, '\t\t\t', df_nonull.filter(df_nonull[col_name].isNull()).count())

_c0 			 0
Country 			 0
Year 			 0
GDP 			 0


In [51]:
# We can split the dataset in the given fraction 
# This can be useful for cross validation 

train, valid, test = df.randomSplit([3.0, 1.0, 1.0], 42)

In [52]:
test.show(3)

+---+-----------+----+--------+
|_c0|    Country|Year|     GDP|
+---+-----------+----+--------+
|  2|Afghanistan|2013| 631.745|
|  6|Afghanistan|2009|445.8933|
|  8|Afghanistan|2007|369.8358|
+---+-----------+----+--------+
only showing top 3 rows



In [53]:
test.show(3)

+---+-----------+----+--------+
|_c0|    Country|Year|     GDP|
+---+-----------+----+--------+
|  2|Afghanistan|2013| 631.745|
|  6|Afghanistan|2009|445.8933|
|  8|Afghanistan|2007|369.8358|
+---+-----------+----+--------+
only showing top 3 rows

