In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [4]:
df = spark.read.json('people.json')

In [6]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [7]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [8]:
df.columns

['age', 'name']

In [10]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [11]:
from pyspark.sql.types import (StructField,StringType,
                               IntegerType,StructType)

In [13]:
data_schema = [StructField('age',IntegerType(),True),
               StructField('name',StringType(),True)]

In [14]:
final_str = StructType(fields=data_schema)

In [15]:
df = spark.read.json('people.json',schema=final_str)

In [16]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [20]:
type(df['age'])

pyspark.sql.column.Column

In [23]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [26]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [28]:
df.select(['age','name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [30]:
df.withColumn('new_age',df['age']*2).show()

+----+-------+-------+
| age|   name|new_age|
+----+-------+-------+
|null|Michael|   null|
|  30|   Andy|     60|
|  19| Justin|     38|
+----+-------+-------+



In [32]:
df.withColumnRenamed('age','my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [33]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [39]:
df.createOrReplaceGlobalTempView('people')

In [44]:
#result = spark.sql("select * from people") # didt work check later

In [45]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()

In [46]:
df = spark.read.csv('appl_stock.csv',inferSchema=True,header=True)

In [47]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [51]:
df.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [57]:
#df.filter("close < 500").select('open','close').show()

In [59]:
#df.filter(df['close'] < 500).select('Volume').show()

In [75]:
#df.filter((df['close'] < 200) & ~(df['open'] > 200) ).show()

In [65]:
res = df.filter(df['low']==197.16).collect()

In [66]:
type(res)

list

In [67]:
res

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [73]:
row = res[0]

In [74]:
row.asDict()['Volume']

220441900

In [76]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('aggs').getOrCreate()
df = spark.read.csv('sales_info.csv',inferSchema=True,header=True)

In [77]:
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [78]:
df = spark.read.csv('sales_info.csv',inferSchema=True,header=True)

In [80]:
#df.show()
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [85]:
df.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [87]:
df.agg({'sales':'max'}).show()

+----------+
|max(sales)|
+----------+
|     870.0|
+----------+



In [88]:
group_data = df.groupby("Company")

In [89]:
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [90]:
from pyspark.sql.functions import countDistinct,avg,stddev

In [94]:
df.select(stddev('Sales').alias('ANG_SALE')).show()

+------------------+
|          ANG_SALE|
+------------------+
|250.08742410799007|
+------------------+



In [95]:
from pyspark.sql.functions import format_number

In [101]:
sale_std = df.select(stddev('Sales').alias('STD'))

In [103]:
sale_std.select(format_number('STD',2)).show()

+---------------------+
|format_number(STD, 2)|
+---------------------+
|               250.09|
+---------------------+



In [105]:
df.orderBy("Sales").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [107]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [109]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('aggs').getOrCreate()
df = spark.read.csv('ContainsNull.csv',inferSchema=True,header=True) 

In [110]:
df.show()


+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [117]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [118]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [121]:
df.na.fill('no_name',subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|no_name| null|
|emp3|no_name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [122]:
from pyspark.sql.functions import mean


In [123]:
mean_val = df.select(mean(df['Sales'])).collect()

In [124]:
mean_sale = mean_val [0][0]

In [125]:
mean_sale

400.5

In [126]:
df.na.fill(mean_sale,subset=['Name']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [127]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('date').getOrCreate()
df = spark.read.csv('appl_stock.csv',inferSchema=True,header=True) 

In [128]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [130]:
from pyspark.sql.functions import dayofmonth,hour,dayofyear,year,weekofyear,format_number,date_format

In [135]:
#df.select(year (df ['date'])).show()

In [142]:
#df.select(year(df['Date'])).show()
newdf = df.withColumn("year",year(df['Date']))

In [143]:
newdf.groupby("Year").mean()

DataFrame[Year: int, avg(Open): double, avg(High): double, avg(Low): double, avg(Close): double, avg(Volume): double, avg(Adj Close): double, avg(year): double]