In [1]:
import findspark

In [2]:
findspark.init('/home/augustine/spark-2.4.7-bin-hadoop2.7')

In [3]:
import pyspark
import py4j

In [3]:
pwd

'/home/augustine'

In [5]:
from pyspark.sql import SparkSession

In [7]:
spark=SparkSession.builder.appName('Basics').getOrCreate()

In [8]:
df=spark.read.json('/home/augustine/spark-2.4.7-bin-hadoop2.7/python/test_support/sql/people.json')

In [9]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [10]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [11]:
df.columns

['age', 'name']

In [12]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [13]:
data_schema=[StructField('age',IntegerType(),True),
             StructField('name',StringType(),True)]

In [14]:
final_struc=StructType(fields=data_schema)

In [15]:
df=spark.read.json('/home/augustine/spark-2.4.7-bin-hadoop2.7/python/test_support/sql/people.json',schema=final_struc)

In [16]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [17]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [18]:
df.columns

['age', 'name']

In [19]:
type(df['age'])

pyspark.sql.column.Column

In [20]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [21]:
df.head(2)[0]

Row(age=None, name='Michael')

In [22]:
df.select(['age','name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [23]:
new_df=df.withColumn('newage_double',df['age']*2)

In [24]:
new_df.show()

+----+-------+-------------+
| age|   name|newage_double|
+----+-------+-------------+
|null|Michael|         null|
|  30|   Andy|           60|
|  19| Justin|           38|
+----+-------+-------------+



In [25]:
df.withColumnRenamed('age','new_age').show()

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



In [26]:
df.createOrReplaceTempView('people')


In [27]:
results=spark.sql('SELECT * from people')

In [28]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [29]:
spark2=SparkSession.builder.appName('ops').getOrCreate()

In [33]:
df2=spark2.read.csv('/home/augustine/spark-2.4.7-bin-hadoop2.7/python/test_support/sql/apple_stock_data.csv',inferSchema=True,header=True)

In [34]:
df2.show()

+-------------------+------+------+------+------+--------+---------+
|               Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+-------------------+------+------+------+------+--------+---------+
|2012-03-30 00:00:00|608.77|610.56|597.94|599.55|26050900|   599.55|
|2012-03-29 00:00:00|612.78|616.56|607.23|609.86|21668300|   609.86|
|2012-03-28 00:00:00|618.38|621.45|610.31|617.62|23385200|   617.62|
|2012-03-27 00:00:00|606.18|616.28|606.06|614.48|21628200|   614.48|
|2012-03-26 00:00:00|599.79|607.15|595.26|606.98|21259900|   606.98|
|2012-03-23 00:00:00|600.49| 601.8| 594.4|596.05|15359900|   596.05|
|2012-03-22 00:00:00|597.78| 604.5|595.53|599.34|22281100|   599.34|
|2012-03-21 00:00:00|602.74|609.65|601.41| 602.5|22958200|    602.5|
|2012-03-20 00:00:00|599.51| 606.9|591.48|605.96|29166500|   605.96|
|2012-03-19 00:00:00|598.37|601.77|589.05| 601.1|32187000|    601.1|
|2012-03-16 00:00:00|584.72| 589.2| 578.0|585.57|29481700|   585.57|
|2012-03-15 00:00:00|599.61|600.01

In [48]:
# df2.filter('Close<500').select('Open','Close').show()
df2.filter(df2['Close'] < 500).select('Open','Close').show()

+------+------+
|  Open| Close|
+------+------+
|514.26|497.67|
|490.96|493.42|
|480.76|493.17|
| 470.5|476.68|
|465.25|468.83|
|458.38|463.97|
| 457.3|459.68|
| 455.9|455.12|
|458.41|456.19|
|455.59|456.48|
|445.71|453.01|
|444.34|447.28|
|448.36|444.63|
|454.44|446.66|
| 425.1|420.41|
|422.67|427.41|
|427.49| 420.3|
|430.15|427.75|
|426.96|429.11|
| 424.2| 424.7|
+------+------+
only showing top 20 rows



In [71]:
df2.filter((df2['Close'] < 200)&(df2['Open'] > 200)).show()

+-------------------+------+------+------+------+--------+---------+
|               Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+-------------------+------+------+------+------+--------+---------+
|2010-01-29 00:00:00|201.08| 202.2|190.25|192.06|44498300|   192.06|
|2010-01-28 00:00:00|204.93| 205.5| 198.7|199.29|41910800|   199.29|
|2010-01-22 00:00:00|206.78| 207.5|197.16|197.75|31491700|   197.75|
|2009-12-01 00:00:00|202.24|202.77|196.83|196.97|16634400|   196.97|
|2009-11-30 00:00:00|201.11|201.68|198.77|199.91|15173500|   199.91|
|2009-10-27 00:00:00|201.66|202.81|196.45|197.37|27019700|   197.37|
|2009-10-20 00:00:00| 200.6|201.75|197.85|198.76|40751400|   198.76|
|2007-12-28 00:00:00|200.59|201.56|196.88|199.83|24987400|   199.83|
+-------------------+------+------+------+------+--------+---------+



In [1]:
#Examples..

In [None]:
# result2=df2.filter(df2['Low']==196.45).collect()

In [76]:
# result2[0].asDict()

In [82]:
# NEW

In [83]:
spark3=SparkSession.builder.appName('aggs').getOrCreate()

In [154]:
df3=spark3.read.csv('/home/augustine/spark-2.4.7-bin-hadoop2.7/python/test_support/sql/sales_info.csv',inferSchema=True,header=True)

In [155]:
df3.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [90]:
df3.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [91]:
df3.groupBy('Company').min().show()
# df3.groupBy('Company').max().show() - to show max

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [92]:
df3.groupBy('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [96]:
group_data=df3.groupBy('Company')

In [94]:
df3.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [99]:
group_data.agg({'Sales':'sum'}).show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [100]:
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [101]:
from pyspark.sql.functions  import countDistinct,avg,stddev

In [107]:
df3.select(stddev('Sales').alias('Standard_deviation')).show()
# examlpes
# df3.select(avg('Sales').alias('Average_values')).show()
# df3.select(countDistinct('Sales').alias('Distinct_values')).show()

+------------------+
|Standard_deviation|
+------------------+
|250.08742410799007|
+------------------+



In [132]:
from pyspark.sql.functions import format_number

In [140]:
std_data=df3.select(stddev('Sales').alias('std'))

In [141]:
std_data.select(format_number('std',2).alias('std1')).show()

+------+
|  std1|
+------+
|250.09|
+------+



In [142]:
# df3.select(format_number('Sales',2).alias('std')).show()

In [146]:
df3.orderBy(df3['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [6]:
from pyspark.sql import SparkSession

In [7]:
spark4=SparkSession.builder.appName('miss').getOrCreate()

In [8]:
df4=spark4.read.csv('/home/augustine/spark-2.4.7-bin-hadoop2.7/python/test_support/sql/ContainsNull.csv',inferSchema=True,header=True)

In [9]:
df4.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [10]:
df4.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [11]:
df4.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [12]:
df4.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [13]:
df4.na.drop(subset=['Name']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp4|Cindy|456.0|
+----+-----+-----+



In [14]:
df4.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [15]:
df4.na.fill('No Name').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [16]:
df4.na.fill('No Name',subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [17]:
df4.na.fill(0,subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [18]:
from pyspark.sql.functions import mean

In [19]:
mean_val=df4.select(mean(df4['Sales'])).show()

+----------+
|avg(Sales)|
+----------+
|     400.5|
+----------+



In [20]:
mean_sales=400.5

In [21]:
df4.na.fill(mean_sales,subset=['Sales']).show() #or df4.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [5]:
from pyspark.sql import SparkSession

In [6]:
spark5=SparkSession.builder.appName('dates').getOrCreate()

In [7]:
df5=spark5.read.csv('/home/augustine/spark-2.4.7-bin-hadoop2.7/python/test_support/sql/apple_stock_data.csv',inferSchema=True,header=True)

In [8]:
df5.show()

+-------------------+------+------+------+------+--------+---------+
|               Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+-------------------+------+------+------+------+--------+---------+
|2012-03-30 00:00:00|608.77|610.56|597.94|599.55|26050900|   599.55|
|2012-03-29 00:00:00|612.78|616.56|607.23|609.86|21668300|   609.86|
|2012-03-28 00:00:00|618.38|621.45|610.31|617.62|23385200|   617.62|
|2012-03-27 00:00:00|606.18|616.28|606.06|614.48|21628200|   614.48|
|2012-03-26 00:00:00|599.79|607.15|595.26|606.98|21259900|   606.98|
|2012-03-23 00:00:00|600.49| 601.8| 594.4|596.05|15359900|   596.05|
|2012-03-22 00:00:00|597.78| 604.5|595.53|599.34|22281100|   599.34|
|2012-03-21 00:00:00|602.74|609.65|601.41| 602.5|22958200|    602.5|
|2012-03-20 00:00:00|599.51| 606.9|591.48|605.96|29166500|   605.96|
|2012-03-19 00:00:00|598.37|601.77|589.05| 601.1|32187000|    601.1|
|2012-03-16 00:00:00|584.72| 589.2| 578.0|585.57|29481700|   585.57|
|2012-03-15 00:00:00|599.61|600.01

In [9]:
df5.head(1)

[Row(Date=datetime.datetime(2012, 3, 30, 0, 0), Open=608.77, High=610.56, Low=597.94, Close=599.55, Volume=26050900, Adj Close=599.55)]

In [10]:
# df5.select(['Date','Open']).show()

In [11]:
from pyspark.sql.functions import dayofmonth,dayofyear,month,year,hour,weekofyear,format_number,date_format

In [12]:
df5.select(dayofmonth(df5['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|              30|
|              29|
|              28|
|              27|
|              26|
|              23|
|              22|
|              21|
|              20|
|              19|
|              16|
|              15|
|              14|
|              13|
|              12|
|               9|
|               8|
|               7|
|               6|
|               5|
+----------------+
only showing top 20 rows



In [13]:
# df5.select(hour(df5['Date'])).show()
# df5.select(month(df5['Date'])).show()
df5.select(year(df5['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
|      2012|
+----------+
only showing top 20 rows



In [14]:
newdf=df5.withColumn('Year',year(df5['Date']))

In [15]:
result=newdf.groupBy('Year').mean().select(['Year','avg(Close)'])

In [16]:
result.select(['Year',format_number('avg(Close)',2).alias('Average Closing Price')]).show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|1990|                37.56|
|2003|                18.54|
|2007|               128.27|
|2006|                70.81|
|1997|                17.97|
|1988|                41.54|
|1994|                34.08|
|2004|                35.53|
|1991|                52.50|
|1996|                24.92|
|1989|                41.66|
|1998|                30.57|
|1985|                20.19|
|2012|               503.68|
|1987|                53.89|
|2009|               146.81|
|1995|                40.54|
|2001|                20.22|
|1992|                54.80|
|2005|                52.40|
+----+---------------------+
only showing top 20 rows

