In [7]:
from datetime import datetime,date
import pandas as pd

##### Creating Session

In [1]:
from pyspark.sql import SparkSession

spark =SparkSession.builder.appName('practise').getOrCreate()

In [2]:
spark

In [9]:
#import
df_spark=spark.read.csv('data-20210604T074015Z-001/data/accounts.csv')

In [10]:
df_spark.show()

+----------------+-------+---------+
|             _c0|    _c1|      _c2|
+----------------+-------+---------+
|         account|revenue|employees|
|      Sunnamplex|4592.96|  13938.0|
|           Silis|5339.57|  18053.0|
|    Groovestreet|2728.86|   6486.0|
|         Donware|2009.52|   3409.0|
|Wonka Industries|4962.27|   4687.0|
|        Faxquote|4939.54|  17075.0|
|      Zathunicon|2913.82|   7332.0|
|      Stanredtax|  14.79|     26.0|
|          Cheers|1124.33|   1531.0|
|     Good Burger| 247.91|    304.0|
|      Lexiqvolax| 652.53|   1644.0|
|         Nam-zim| 369.58|   1167.0|
|        Ron-tech|3805.02|   7596.0|
|         Finhigh| 398.18|   1311.0|
|          Sonron| 588.46|   1668.0|
|         Domzoom|2238.78|   3439.0|
|        Gogozoom| 3577.1|  10379.0|
|        Scotfind|3911.27|   8284.0|
|       Ganjaflex|  161.8|    240.0|
+----------------+-------+---------+
only showing top 20 rows



In [12]:
df_spark.columns

['_c0', '_c1', '_c2']

In [14]:
rdd=spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])

In [17]:
df = spark.createDataFrame(rdd,schema=['a','b','c','d','e'])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [18]:
#To view
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [19]:
#Print schema of rdd
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [20]:
df.a     #X

Column<'a'>

In [51]:
df.select('a').show()

+---+
|  a|
+---+
|  1|
|  2|
|  3|
+---+



In [50]:
type(df)         #Datatype

pyspark.sql.dataframe.DataFrame

In [52]:
df.dtypes

[('a', 'bigint'),
 ('b', 'double'),
 ('c', 'string'),
 ('d', 'date'),
 ('e', 'timestamp')]

In [30]:
df.select(df.a > 2).show()

+-------+
|(a > 2)|
+-------+
|  false|
|  false|
|   true|
+-------+



##### Filter

In [31]:
df.filter(df.a==1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



In [114]:
df.filter(df.a==1).select(['a','e']).show()

+---+-------------------+
|  a|                  e|
+---+-------------------+
|  1|2000-01-01 12:00:00|
+---+-------------------+



In [34]:
df.select('a','c').describe().show()

+-------+---+-------+
|summary|  a|      c|
+-------+---+-------+
|  count|  3|      3|
|   mean|2.0|   null|
| stddev|1.0|   null|
|    min|  1|string1|
|    max|  3|string3|
+-------+---+-------+



In [33]:
df.describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



#### Adding and Delecting column

In [55]:
x=df.withColumn('aa',df['a']*df['a'])
x.show()

+---+---+-------+----------+-------------------+---+
|  a|  b|      c|         d|                  e| aa|
+---+---+-------+----------+-------------------+---+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|  1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|  4|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|  9|
+---+---+-------+----------+-------------------+---+



In [57]:
#Delete
x=x.drop('aa')
x.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



#### Feature Engineering


In [63]:
from pyspark.sql import Row

In [88]:
newRow=spark.createDataFrame([Row(a=4,b=1.1,c='abhishek',d='NaN',e='NaN')])

In [74]:
df=df.union(newRow)

In [75]:
df.show()

+---+---+--------+----------+-------------------+
|  a|  b|       c|         d|                  e|
+---+---+--------+----------+-------------------+
|  1|2.0| string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0| string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0| string3|2000-03-01|2000-01-03 12:00:00|
|  4|1.1|abhishek|       NaN|                NaN|
+---+---+--------+----------+-------------------+



In [91]:
#Handling missing values    

In [94]:
df=df.na.replace('NaN',None)      #Replacing
df.show()

+---+---+--------+----------+-------------------+
|  a|  b|       c|         d|                  e|
+---+---+--------+----------+-------------------+
|  1|2.0| string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0| string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0| string3|2000-03-01|2000-01-03 12:00:00|
|  4|1.1|abhishek|      null|               null|
+---+---+--------+----------+-------------------+



In [106]:
df.na.drop(how='any').show() #Drop

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [113]:
df.na.fill('helloworld').show()

+---+---+--------+----------+-------------------+
|  a|  b|       c|         d|                  e|
+---+---+--------+----------+-------------------+
|  1|2.0| string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0| string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0| string3|2000-03-01|2000-01-03 12:00:00|
|  4|1.1|abhishek|helloworld|         helloworld|
+---+---+--------+----------+-------------------+



## Function in pyspark

In [41]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [38]:
@pandas_udf('long')
def pandas_square(series:pd.Series)->pd.Series:
    return series*series

In [39]:
df.select(pandas_square(df.a)).show()

+----------------+
|pandas_square(a)|
+----------------+
|               1|
|               4|
|               9|
+----------------+



In [47]:
def pandas_filter_fun(iterator):
    for i in iterator:
        yield i[i.a == 2]

In [48]:
df.mapInPandas(pandas_filter_fun,schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+



### Groupby

In [115]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [116]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+



In [118]:
df.groupby(['color','fruit']).avg().show()

+-----+------+-------+-------+
|color| fruit|avg(v1)|avg(v2)|
+-----+------+-------+-------+
| blue| grape|    4.0|   40.0|
|black|carrot|    6.0|   60.0|
|  red|banana|    4.0|   40.0|
|  red| grape|    8.0|   80.0|
|  red|carrot|    4.0|   40.0|
| blue|banana|    2.0|   20.0|
+-----+------+-------+-------+



In [120]:
def pandas_mean(data):
    return data.assign(v1=(data.v2+data.v1)/2)

In [121]:
df.groupby('color').applyInPandas(pandas_mean,schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  5| 10|
|  red|carrot| 16| 30|
|  red|carrot| 27| 50|
|  red|banana| 38| 70|
|  red| grape| 44| 80|
|black|carrot| 33| 60|
| blue|banana| 11| 20|
| blue| grape| 22| 40|
+-----+------+---+---+



In [124]:
df.write.csv('foo.csv',header=True)

In [125]:
spark.read.csv('foo.csv',header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  7| 70|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|  red|banana|  1| 10|
|  red|carrot|  3| 30|
|  red| grape|  8| 80|
+-----+------+---+---+



In [126]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+

