In [65]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("app").getOrCreate()

In [7]:
df = spark.read.json("./Python-and-Spark-for-Big-Data-master/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json")

In [8]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [9]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [11]:
df.columns

['age', 'name']

In [13]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [14]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [34]:
data_schema = [StructField("age", IntegerType(), nullable = True),
              StructField("name", StringType(), nullable = True)]

In [35]:
final_struc = StructType(fields=data_schema)

In [36]:
df = spark.read.json(
    "./Python-and-Spark-for-Big-Data-master/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json", 
    schema=final_struc
)

In [38]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [39]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [41]:
df["age"].show()

TypeError: 'Column' object is not callable

In [42]:
df.select("age").show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [43]:
type(df["age"]), type(df.select("age"))

(pyspark.sql.column.Column, pyspark.sql.dataframe.DataFrame)

In [48]:
df.head(2)[0], type(df.head(2)[0])

(Row(age=None, name='Michael'), pyspark.sql.types.Row)

In [50]:
df.withColumn("new_age", df["age"]*2).show()

+----+-------+-------+
| age|   name|new_age|
+----+-------+-------+
|null|Michael|   null|
|  30|   Andy|     60|
|  19| Justin|     38|
+----+-------+-------+



In [58]:
from pyspark.sql.functions import *
df.withColumn("Name", concat(df["Name"], lit("_lastname"))).show()

+----+----------------+
| age|            Name|
+----+----------------+
|null|Michael_lastname|
|  30|   Andy_lastname|
|  19| Justin_lastname|
+----+----------------+



In [60]:
df.withColumnRenamed("Name", "Name_last").show()

+----+---------+
| age|Name_last|
+----+---------+
|null|  Michael|
|  30|     Andy|
|  19|   Justin|
+----+---------+



In [64]:
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [73]:
df = spark.read.csv("./Python-and-Spark-for-Big-Data-master/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv", 
              inferSchema=True,
                header=True)

In [74]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [75]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [78]:
df.filter("Date < 2010-01-28")

AnalysisException: [DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(Date < ((2010 - 1) - 28))" due to data type mismatch: the left and right operands of the binary operator have incompatible types ("DATE" and "INT").; line 1 pos 0;
'Filter (Date#652 < ((2010 - 1) - 28))
+- Relation [Date#652,Open#653,High#654,Low#655,Close#656,Volume#657,Adj Close#658] csv


In [86]:
df.filter(df["Date"] < "2010-01-28").show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [83]:
df.filter("Close < 500").select(["Date", "Close"]).show(2)

+----------+----------+
|      Date|     Close|
+----------+----------+
|2010-01-04|214.009998|
|2010-01-05|214.379993|
+----------+----------+
only showing top 2 rows



In [84]:
df.filter(df["Close"] < 500).select(["Date", "Close"]).show(2)

+----------+----------+
|      Date|     Close|
+----------+----------+
|2010-01-04|214.009998|
|2010-01-05|214.379993|
+----------+----------+
only showing top 2 rows



In [95]:
df.filter((df["Close"] < 500) & (df["Close"]>495)).select(["Date", "Close"]).show()

+----------+------------------+
|      Date|             Close|
+----------+------------------+
|2012-02-15|        497.669975|
|2013-08-14|        498.500008|
|2013-08-15|497.90998099999996|
|2013-09-04|        498.690025|
|2013-09-05|495.26997400000005|
|2013-09-06|498.22000099999997|
|2013-10-14|        496.039978|
|2013-10-15|        498.679985|
|2014-01-30|        499.779984|
+----------+------------------+



In [104]:
df.take(1)[0].asDict()

{'Date': datetime.date(2010, 1, 4),
 'Open': 213.429998,
 'High': 214.499996,
 'Low': 212.38000099999996,
 'Close': 214.009998,
 'Volume': 123432400,
 'Adj Close': 27.727039}

In [108]:
df = spark.read.csv("./Python-and-Spark-for-Big-Data-master/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/sales_info.csv", 
                   inferSchema=True, header=True)

In [109]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [110]:
df.show(3)

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
+-------+-------+-----+
only showing top 3 rows



In [120]:
df.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [121]:
df.groupBy("Company").sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [122]:
df.groupBy("Company").max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [126]:
df.agg({"Sales": "sum"}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [127]:
df.agg({"Sales": "count"}).show()

+------------+
|count(Sales)|
+------------+
|          12|
+------------+



In [139]:
df.groupBy("Company").agg({"Sales": "sum"}).show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [137]:
df.select("Company").distinct().show()

+-------+
|Company|
+-------+
|   APPL|
|   GOOG|
|     FB|
|   MSFT|
+-------+



In [141]:
df.select(pyspark.sql.functions.avg("Sales").alias("Avg_sales")).show()

+-----------------+
|        Avg_sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [143]:
df.select(pyspark.sql.functions.format_number(pyspark.sql.functions.avg("Sales"), 2).alias("Avg_sales")).show()

+---------+
|Avg_sales|
+---------+
|   360.58|
+---------+



In [145]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [148]:
df.groupBy(["Company", "Person"]).sum().show()

+-------+-------+----------+
|Company| Person|sum(Sales)|
+-------+-------+----------+
|   GOOG|    Sam|     200.0|
|     FB|  Sarah|     350.0|
|   APPL|  Chris|     350.0|
|   MSFT|Vanessa|     243.0|
|   APPL|   John|     250.0|
|   APPL|  Linda|     130.0|
|   GOOG|Charlie|     120.0|
|   APPL|   Mike|     750.0|
|     FB|   Carl|     870.0|
|   MSFT|   Tina|     600.0|
|   MSFT|    Amy|     124.0|
|   GOOG|  Frank|     340.0|
+-------+-------+----------+



In [149]:
df.groupBy(["Company", "Person"]).sum().show()

+-------+-------+----------+
|Company| Person|sum(Sales)|
+-------+-------+----------+
|   GOOG|    Sam|     200.0|
|     FB|  Sarah|     350.0|
|   APPL|  Chris|     350.0|
|   MSFT|Vanessa|     243.0|
|   APPL|   John|     250.0|
|   APPL|  Linda|     130.0|
|   GOOG|Charlie|     120.0|
|   APPL|   Mike|     750.0|
|     FB|   Carl|     870.0|
|   MSFT|   Tina|     600.0|
|   MSFT|    Amy|     124.0|
|   GOOG|  Frank|     340.0|
+-------+-------+----------+



In [151]:
df.orderBy("Sales").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [154]:
df.orderBy("Sales", ascending=False).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [156]:
df.orderBy(df["Sales"].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [157]:
df = spark.read.csv("./Python-and-Spark-for-Big-Data-master/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/ContainsNull.csv",
                   inferSchema=True, header=True)

In [158]:
df.show(2)

+----+----+-----+
|  Id|Name|Sales|
+----+----+-----+
|emp1|John| null|
|emp2|null| null|
+----+----+-----+
only showing top 2 rows



In [159]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [160]:
df.count()

4

In [178]:
df.select(df["Name"].isNull().cast("integer").alias("null")).agg({"null": "sum"}).show()

+---------+
|sum(null)|
+---------+
|        2|
+---------+



In [182]:
df.select(pyspark.sql.functions.sum(df["Name"].isNull().cast("integer"))).show()

+--------------------------------+
|sum(CAST((Name IS NULL) AS INT))|
+--------------------------------+
|                               2|
+--------------------------------+



In [183]:
df.select(pyspark.sql.functions.sum(df["Name"].isNull().cast("integer"))).show()

+--------------------------------+
|sum(CAST((Name IS NULL) AS INT))|
+--------------------------------+
|                               2|
+--------------------------------+



In [184]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [186]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [187]:
df.na.drop(how="all").show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [206]:
df.na.drop(thresh=2).show() # thresh count not null < threshold values in ROW
# We can see all rows have atleast 2 not null values

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [209]:
df.na.drop(subset="Sales").show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [211]:
df.na.fill("Filled").show()

+----+------+-----+
|  Id|  Name|Sales|
+----+------+-----+
|emp1|  John| null|
|emp2|Filled| null|
|emp3|Filled|345.0|
|emp4| Cindy|456.0|
+----+------+-----+



In [212]:
df.na.fill(10).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| 10.0|
|emp2| null| 10.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [224]:
mean = df.select(pyspark.sql.functions.mean(df["Sales"])).collect()
print(mean)
mean_value = mean[0][0]
print(mean_value)

[Row(avg(Sales)=400.5)]
400.5


In [247]:
df.na.fill(mean_value, subset="Sales").show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [246]:
df.select("sales").agg({"sales": "mean"}).first()[0]

400.5

In [250]:
df = spark.read.csv("./Python-and-Spark-for-Big-Data-master/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv", 
                   inferSchema=True, header=True)

In [251]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [253]:
df = df.select(["Date", "Volume"])

In [298]:
df.orderBy(df["Date"].desc()).show(2)

+----------+--------+
|      Date|  Volume|
+----------+--------+
|2016-12-30|30586300|
|2016-12-29|15039500|
+----------+--------+
only showing top 2 rows



In [257]:
from pyspark.sql.functions import dayofmonth, hour, dayofyear, month, year, weekofyear, date_format, format_number

In [263]:
df.select(["Date", dayofmonth(df["Date"])]).show(2)

+----------+----------------+
|      Date|dayofmonth(Date)|
+----------+----------------+
|2010-01-04|               4|
|2010-01-05|               5|
+----------+----------------+
only showing top 2 rows



In [264]:
df.select(["Date", month(df["Date"])]).show(2)

+----------+-----------+
|      Date|month(Date)|
+----------+-----------+
|2010-01-04|          1|
|2010-01-05|          1|
+----------+-----------+
only showing top 2 rows



In [265]:
df.select(["Date", year(df["Date"])]).show(2)

+----------+----------+
|      Date|year(Date)|
+----------+----------+
|2010-01-04|      2010|
|2010-01-05|      2010|
+----------+----------+
only showing top 2 rows



In [275]:
df.select([year(df["Date"]).alias("year")]).groupby("year").count().orderBy("year").show()

+----+-----+
|year|count|
+----+-----+
|2010|  252|
|2011|  252|
|2012|  250|
|2013|  252|
|2014|  252|
|2015|  252|
|2016|  252|
+----+-----+



In [284]:
avg_vol_per_year = df.select([year(df["Date"]).alias("year"), "Volume"]).groupby("year").agg({"Volume":"avg"}).orderBy("year")
avg_vol_per_year.show()

+----+--------------------+
|year|         avg(Volume)|
+----+--------------------+
|2010|1.4982631666666666E8|
|2011|1.2307474166666667E8|
|2012|       1.319642044E8|
|2013|          1.016087E8|
|2014| 6.315273055555555E7|
|2015|  5.18378869047619E7|
|2016|  3.84153623015873E7|
+----+--------------------+



In [290]:
avg_vol_per_year.select(["year", format_number("avg(Volume)", 2)]).show()

+----+-----------------------------+
|year|format_number(avg(Volume), 2)|
+----+-----------------------------+
|2010|               149,826,316.67|
|2011|               123,074,741.67|
|2012|               131,964,204.40|
|2013|               101,608,700.00|
|2014|                63,152,730.56|
|2015|                51,837,886.90|
|2016|                38,415,362.30|
+----+-----------------------------+

