# PySpark 1.6 Workbook

In [3]:
import pyspark
from pyspark.sql import SQLContext

### Creating table df_rtable(raw table) using pyspark DataFrame

In [4]:
df_rtable = sqlContext.createDataFrame(
    [("Green", "1", "1.3", "DescriptionONE"),
     ("Green", "0", "1.445", "DescriptionONE"),
     ("Blue", "4", "1.2", "DescriptionTWO"),
     ("Red", "5", "1.3", "Description THREE"),
     ("Yellow", "7", "1.325", "Description Four"), 
    ("Red", "9", "1.4", "DescriptionONE"),
    ("Red", "6", "1.72158", "Description THREE"),
     ("Blue", "5", "1", "DescriptionONE")],
    ("Input 1", "Input 2", "Input 3", "Description"))

### To view the loaded raw table in data frame

In [5]:
df_rtable.show()

+-------+-------+-------+-----------------+
|Input 1|Input 2|Input 3|      Description|
+-------+-------+-------+-----------------+
|  Green|      1|    1.3|   DescriptionONE|
|  Green|      0|  1.445|   DescriptionONE|
|   Blue|      4|    1.2|   DescriptionTWO|
|    Red|      5|    1.3|Description THREE|
| Yellow|      7|  1.325| Description Four|
|    Red|      9|    1.4|   DescriptionONE|
|    Red|      6|1.72158|Description THREE|
|   Blue|      5|      1|   DescriptionONE|
+-------+-------+-------+-----------------+



### To the Schema of the loaded table (Gives details of column names, data types, and other restrictions).

In [6]:
df_rtable.printSchema()

root
 |-- Input 1: string (nullable = true)
 |-- Input 2: string (nullable = true)
 |-- Input 3: string (nullable = true)
 |-- Description: string (nullable = true)



### Importing library functions required for this project

In [7]:
from pyspark.sql.functions import upper

In [8]:
from pyspark.sql.functions import udf, col

### Python fucntion to check for spaces

In [9]:
def spacerem_upper(x):
    pos = x.find(' ')
    #print(pos)
    if pos != - 1:
        y = x[:pos] + x[pos+1 : ].upper()
        return y
    else:
        return x

### Converting to UDF

In [14]:
spaceDeleteUDF = udf(spacerem_upper)

### Checking the type

In [16]:
type(spaceDeleteUDF)

pyspark.sql.functions.UserDefinedFunction

### Applying UDF to "Description column" for the table

In [17]:
df_upper2 = df_rtable.withColumn("Description", spaceDeleteUDF("Description"))

### Displaying the table

In [18]:
df_upper2.show()

+-------+-------+-------+----------------+
|Input 1|Input 2|Input 3|     Description|
+-------+-------+-------+----------------+
|  Green|      1|    1.3|  DescriptionONE|
|  Green|      0|  1.445|  DescriptionONE|
|   Blue|      4|    1.2|  DescriptionTWO|
|    Red|      5|    1.3|DescriptionTHREE|
| Yellow|      7|  1.325| DescriptionFOUR|
|    Red|      9|    1.4|  DescriptionONE|
|    Red|      6|1.72158|DescriptionTHREE|
|   Blue|      5|      1|  DescriptionONE|
+-------+-------+-------+----------------+



### Importing packages required to perform other tasks

In [19]:
from pyspark.sql.functions import format_number

### Changing the format of a column in a table

In [12]:
df_upper3 =df_upper2.select(df_upper2['Input 1'], df_upper2['Input 2'], df_upper2['Input 3'].cast('float'), 
                            df_upper2['Description'])

In [39]:
#df_upper31 = df_upper2.withColumn('Input 3', df_upper2['Input 3'].cast('float'))

In [40]:
#df_upper31.printSchema()

root
 |-- Input 1: string (nullable = true)
 |-- Input 2: string (nullable = true)
 |-- Input 3: float (nullable = true)
 |-- Description: string (nullable = true)



In [41]:
#df_upper31.show()

+-------+-------+-------+----------------+
|Input 1|Input 2|Input 3|     Description|
+-------+-------+-------+----------------+
|  Green|      1|    1.3|  DescriptionONE|
|  Green|      0|  1.445|  DescriptionONE|
|   Blue|      4|    1.2|  DescriptionTWO|
|    Red|      5|    1.3|DescriptionTHREE|
| Yellow|      7|  1.325| DescriptionFOUR|
|    Red|      9|    1.4|  DescriptionONE|
|    Red|      6|1.72158|DescriptionTHREE|
|   Blue|      5|    1.0|  DescriptionONE|
+-------+-------+-------+----------------+



In [48]:
#df_upper4 = df_upper31.withColumn('Input 3',format_number(df_upper31['Input 3'],4))

In [50]:
df_upper41 = df_upper2.withColumn('Input 3', format_number((df_upper2['Input 3'].cast('float')),4))

### Checking the schemas for the columns in a table

In [51]:
df_upper41.printSchema()

root
 |-- Input 1: string (nullable = true)
 |-- Input 2: string (nullable = true)
 |-- Input 3: string (nullable = true)
 |-- Description: string (nullable = true)



### Displaying the table

In [52]:
df_upper41.show()

+-------+-------+-------+----------------+
|Input 1|Input 2|Input 3|     Description|
+-------+-------+-------+----------------+
|  Green|      1| 1.3000|  DescriptionONE|
|  Green|      0| 1.4450|  DescriptionONE|
|   Blue|      4| 1.2000|  DescriptionTWO|
|    Red|      5| 1.3000|DescriptionTHREE|
| Yellow|      7| 1.3250| DescriptionFOUR|
|    Red|      9| 1.4000|  DescriptionONE|
|    Red|      6| 1.7216|DescriptionTHREE|
|   Blue|      5| 1.0000|  DescriptionONE|
+-------+-------+-------+----------------+



In [18]:
#from pyspark.sql import SQLContext
#df = data.selectExpr("Name as name", "askdaosdka as age")

### Creating a new dataframe

In [53]:
df_rtable2 = sqlContext.createDataFrame([("Green", "Night"),
                                        ("Yellow", "Morning"),
                                        ("Red", "Afternoon"),
                                        ("Blue","Evening")])

In [54]:
df_rtable2.show()

+------+---------+
|    _1|       _2|
+------+---------+
| Green|    Night|
|Yellow|  Morning|
|   Red|Afternoon|
|  Blue|  Evening|
+------+---------+



In [55]:
df_rtable2.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



In [66]:
df_rtable3 = df_rtable2.withColumnRenamed('_1','Input 1').withColumnRenamed('_2','Day Period')

In [69]:
df_rtable3.show()

+-------+----------+
|Input 1|Day Period|
+-------+----------+
|  Green|     Night|
| Yellow|   Morning|
|    Red| Afternoon|
|   Blue|   Evening|
+-------+----------+



In [80]:
df_rtable31 = df_rtable2.withColumnRenamed('_1','Input 5').withColumnRenamed('_2','Day Period')

In [84]:
df_rtable31.show()

+-------+----------+
|Input 5|Day Period|
+-------+----------+
|  Green|     Night|
| Yellow|   Morning|
|    Red| Afternoon|
|   Blue|   Evening|
+-------+----------+



In [77]:
leftjoin=df_upper4.join(df_rtable3,['Input 1'],"left")

In [71]:
leftjoin.show()

+-------+-------+-------+----------------+----------+
|Input 1|Input 2|Input 3|     Description|Day Period|
+-------+-------+-------+----------------+----------+
|   Blue|      4| 1.2000|  DescriptionTWO|   Evening|
|   Blue|      5| 1.0000|  DescriptionONE|   Evening|
|  Green|      1| 1.3000|  DescriptionONE|     Night|
|  Green|      0| 1.4450|  DescriptionONE|     Night|
| Yellow|      7| 1.3250| DescriptionFOUR|   Morning|
|    Red|      5| 1.3000|DescriptionTHREE| Afternoon|
|    Red|      9| 1.4000|  DescriptionONE| Afternoon|
|    Red|      6| 1.7216|DescriptionTHREE| Afternoon|
+-------+-------+-------+----------------+----------+



# For random generation of Dates

In [26]:
from datetime import datetime
#import random

In [27]:
from pyspark.sql.functions import date_format
from pyspark.sql.functions import lit
import random

In [28]:
from pyspark.sql.types import *

### Python function to create random dates

In [29]:
def date_rand(a):
    date_YYYY = str(random.randint(1950, 2000))
    date_MM   = str(random.randint(01, 12)).zfill(02)
    date_DD   = str(random.randint(01, 28)).zfill(02)
    date_sep  = '-'
    return  date_YYYY + date_MM + date_DD 

udfdate_rand = udf(date_rand, StringType())

In [30]:
leftjoin = leftjoin.withColumn("Date",lit(0))

In [31]:
leftjoin_w_date = leftjoin.withColumn("Date", udfdate_rand("Date"))

In [32]:
leftjoin_w_date.show()


+-------+-------+-------+----------------+----------+--------+
|Input 1|Input 2|Input 3|     Description|Day Period|    Date|
+-------+-------+-------+----------------+----------+--------+
|   Blue|      4| 1.2000|  DescriptionTWO|   Evening|19921008|
|   Blue|      5| 1.0000|  DescriptionONE|   Evening|19750312|
|  Green|      1| 1.3000|  DescriptionONE|     Night|19921008|
|  Green|      0| 1.4450|  DescriptionONE|     Night|19750312|
| Yellow|      7| 1.3250| DescriptionFOUR|   Morning|19510116|
|    Red|      5| 1.3000|DescriptionTHREE| Afternoon|19510116|
|    Red|      9| 1.4000|  DescriptionONE| Afternoon|19540723|
|    Red|      6| 1.7216|DescriptionTHREE| Afternoon|19590414|
+-------+-------+-------+----------------+----------+--------+



In [33]:
leftjoin_w_date.printSchema()

root
 |-- Input 1: string (nullable = true)
 |-- Input 2: string (nullable = true)
 |-- Input 3: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Day Period: string (nullable = true)
 |-- Date: string (nullable = true)



In [34]:
leftjoin_w_date = leftjoin_w_date.withColumn("Input 3",leftjoin_w_date["Input 3"].cast('float'))

In [35]:
leftjoin_w_date.printSchema()

root
 |-- Input 1: string (nullable = true)
 |-- Input 2: string (nullable = true)
 |-- Input 3: float (nullable = true)
 |-- Description: string (nullable = true)
 |-- Day Period: string (nullable = true)
 |-- Date: string (nullable = true)



In [36]:
df_calc =leftjoin_w_date.filter((leftjoin_w_date['Input 3'] < 1.31 ) 
                                 & (leftjoin_w_date['Input 1'] != 'Red') 
                                 & (leftjoin_w_date['Input 1'] != 'Green'))

In [37]:
df_calc.show()

+-------+-------+-------+--------------+----------+--------+
|Input 1|Input 2|Input 3|   Description|Day Period|    Date|
+-------+-------+-------+--------------+----------+--------+
|   Blue|      4|    1.2|DescriptionTWO|   Evening|19561222|
|   Blue|      5|    1.0|DescriptionONE|   Evening|19940610|
+-------+-------+-------+--------------+----------+--------+



In [38]:
df_calc.printSchema()

root
 |-- Input 1: string (nullable = true)
 |-- Input 2: string (nullable = true)
 |-- Input 3: float (nullable = true)
 |-- Description: string (nullable = true)
 |-- Day Period: string (nullable = true)
 |-- Date: string (nullable = true)



In [39]:
date_list = leftjoin_w_date.select('Date').collect()

date_array = [int(i.Date) for i in date_list]

In [40]:
type(date_array)

list

In [41]:
defg = sorted(date_array)
defg

[19731015,
 19780121,
 19870717,
 19870717,
 19930415,
 19930415,
 19960425,
 19960425]

In [42]:
len(defg)

8

In [43]:
if len(defg)%2 == 0:
    date_cmp1 = defg[(len(defg)/2)-1]
    date_cmp2 = defg[(len(defg)/2)]
    if date_cmp1 == date_cmp2:
        date_cmp = date_cmp1
    else:
        date_cmp = date_cmp1
else:
    date_cmp = defg[(len(defg)/2)-1]

In [44]:
date_cmp

19870717

In [45]:
import pyspark.sql.functions as F

In [46]:
leftjoin_w_date.printSchema()

root
 |-- Input 1: string (nullable = true)
 |-- Input 2: string (nullable = true)
 |-- Input 3: float (nullable = true)
 |-- Description: string (nullable = true)
 |-- Day Period: string (nullable = true)
 |-- Date: string (nullable = true)



In [47]:
leftjoin_w_date = leftjoin_w_date.withColumn("Date",leftjoin_w_date["Date"].cast('int')).withColumn("Input 2",leftjoin_w_date["Input 2"].cast('int'))

In [48]:
leftjoin_w_date.printSchema()

root
 |-- Input 1: string (nullable = true)
 |-- Input 2: integer (nullable = true)
 |-- Input 3: float (nullable = true)
 |-- Description: string (nullable = true)
 |-- Day Period: string (nullable = true)
 |-- Date: integer (nullable = true)



In [49]:
def flag_func(a,b):
    if a > date_cmp and b > 1:
        flag = 1 
    else:
        flag = 0 
    return flag

In [50]:
udf_falg_func = udf(flag_func)

In [51]:
leftjoin_w_flag = leftjoin_w_date.withColumn('flag', udf_falg_func("Date","Input 2"))

In [52]:
leftjoin_w_flag.show()

+-------+-------+-------+----------------+----------+--------+----+
|Input 1|Input 2|Input 3|     Description|Day Period|    Date|flag|
+-------+-------+-------+----------------+----------+--------+----+
|   Blue|      4|    1.2|  DescriptionTWO|   Evening|19680522|   0|
|   Blue|      5|    1.0|  DescriptionONE|   Evening|19660822|   0|
|  Green|      1|    1.3|  DescriptionONE|     Night|20000314|   0|
|  Green|      0|  1.445|  DescriptionONE|     Night|19920608|   0|
| Yellow|      7|  1.325| DescriptionFOUR|   Morning|19690328|   0|
|    Red|      5|    1.3|DescriptionTHREE| Afternoon|19931126|   1|
|    Red|      9|    1.4|  DescriptionONE| Afternoon|19570114|   0|
|    Red|      6| 1.7216|DescriptionTHREE| Afternoon|19700807|   0|
+-------+-------+-------+----------------+----------+--------+----+



In [53]:
df1 = leftjoin_w_flag.groupby('Description').agg(F.sum("Input 3")/(F.min("Input 2"))).alias('sumbymin')

In [54]:
df1.show()

+----------------+-----------------------------------------------------------------------------------------------+
|     Description|((sum(Input 3),mode=Complete,isDistinct=false) / (min(Input 2),mode=Complete,isDistinct=false))|
+----------------+-----------------------------------------------------------------------------------------------+
|  DescriptionONE|                                                                                           null|
|DescriptionTHREE|                                                                             0.6043200016021728|
| DescriptionFOUR|                                                                             0.1892857210976737|
|  DescriptionTWO|                                                                            0.30000001192092896|
+----------------+-----------------------------------------------------------------------------------------------+



In [55]:
def function_5_to_10(a,b):
    df_upper4  = a
    df_rtable2 = b
    
    df_rtable3 = df_rtable2.withColumnRenamed('_1','Input 1').withColumnRenamed('_2','Day Period')
    
    df_rtable3.show()
    
    leftjoin = df_upper4.join(df_rtable3,['Input 1'],"left")
    
    leftjoin.show()
    
    def date_rand(a):
        date_YYYY = str(random.randint(1950, 2000))
        date_MM   = str(random.randint(01, 12)).zfill(02)
        date_DD   = str(random.randint(01, 28)).zfill(02)
        date_sep  = '-'
        return  date_YYYY + date_MM + date_DD 

    udfdate_rand = udf(date_rand, StringType())
    
    leftjoin = leftjoin.withColumn("Date",lit(0))
    
    leftjoin_w_date = leftjoin.withColumn("Date", udfdate_rand("Date"))
    
    leftjoin_w_date = leftjoin_w_date.withColumn("Input 3",leftjoin_w_date["Input 3"].cast('float'))
    
    leftjoin_w_date.show()
    
    df_calc =leftjoin_w_date.filter((leftjoin_w_date['Input 3'] < 1.31 ) 
                                 & (leftjoin_w_date['Input 1'] != 'Red') 
                                 & (leftjoin_w_date['Input 1'] != 'Green'))
    
    df_calc.show()
    
    date_list = leftjoin_w_date.select('Date').collect()

    date_array = [int(i.Date) for i in date_list]
    
    defg = sorted(date_array)
    
    if len(defg)%2 == 0:
        date_cmp1 = defg[(len(defg)/2)-1]
        date_cmp2 = defg[(len(defg)/2)]
        if date_cmp1 == date_cmp2:
            date_cmp = date_cmp1
        else:
            date_cmp = date_cmp1
    else:
        date_cmp = defg[(len(defg)/2)-1]
        
    leftjoin_w_date = leftjoin_w_date.withColumn("Date",leftjoin_w_date["Date"].cast('int'))
    leftjoin_w_date = leftjoin_w_date.withColumn("Input 2",leftjoin_w_date["Input 2"].cast('int'))
    
    def flag_func(a,b):
        if a > date_cmp and b > 1:
            flag = 1 
        else:
            flag = 0 
        return flag
    
    udf_falg_func = udf(flag_func)
    
    leftjoin_w_flag = leftjoin_w_date.withColumn('flag', udf_falg_func("Date","Input 2"))
    
    leftjoin_w_flag.show()
    
    df1 = leftjoin_w_flag.groupby('Description').agg(F.sum("Input 3")/(F.min("Input 2"))).alias('sumbymin')
    df1.show()


In [56]:
a = function_5_to_10(df_upper4,df_rtable2)

+-------+----------+
|Input 1|Day Period|
+-------+----------+
|  Green|     Night|
| Yellow|   Morning|
|    Red| Afternoon|
|   Blue|   Evening|
+-------+----------+

+-------+-------+-------+----------------+----------+
|Input 1|Input 2|Input 3|     Description|Day Period|
+-------+-------+-------+----------------+----------+
|   Blue|      4| 1.2000|  DescriptionTWO|   Evening|
|   Blue|      5| 1.0000|  DescriptionONE|   Evening|
|  Green|      1| 1.3000|  DescriptionONE|     Night|
|  Green|      0| 1.4450|  DescriptionONE|     Night|
| Yellow|      7| 1.3250| DescriptionFOUR|   Morning|
|    Red|      5| 1.3000|DescriptionTHREE| Afternoon|
|    Red|      9| 1.4000|  DescriptionONE| Afternoon|
|    Red|      6| 1.7216|DescriptionTHREE| Afternoon|
+-------+-------+-------+----------------+----------+

+-------+-------+-------+----------------+----------+--------+
|Input 1|Input 2|Input 3|     Description|Day Period|    Date|
+-------+-------+-------+----------------+----------+---