In [3]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 27 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 41.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=972b662868cca56dbf125960962683a2a1bbf990519bec87cc4efc24f7a8ab7f
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark').getOrCreate()

In [5]:
spark

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [7]:
liste = [(1, "Joseph", "Forest", 32, 12000),
        (2, "Tommy", "Happy", 38, 13000),
        (3, "Thomas", "Horse", 30, 11000),
        (4, "Strato", "Wise", 37 , 15000)]

In [None]:
# columns = ["ID","Name","Surname","Age","Salary"]
# df = spark.createDataFrame(liste, columns)

In [8]:
list_schema = StructType([StructField("ID", IntegerType(), False),
                          StructField("Name", StringType(), True),
                          StructField("Surname", StringType(), True),
                          StructField("Age", IntegerType(), True),
                          StructField("Salary", IntegerType(), True)
                          ])

In [9]:
sample_df = spark.createDataFrame(data=liste, schema=list_schema)

In [12]:
sample_df

DataFrame[ID: int, Name: string, Surname: string, Age: int, Salary: int]

In [13]:
sample_df.show()

+---+------+-------+---+------+
| ID|  Name|Surname|Age|Salary|
+---+------+-------+---+------+
|  1|Joseph| Forest| 32| 12000|
|  2| Tommy|  Happy| 38| 13000|
|  3|Thomas|  Horse| 30| 11000|
|  4|Strato|   Wise| 37| 15000|
+---+------+-------+---+------+



In [14]:
sample_df.printSchema()

root
 |-- ID: integer (nullable = false)
 |-- Name: string (nullable = true)
 |-- Surname: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [15]:
sample_df.head(2)

[Row(ID=1, Name='Joseph', Surname='Forest', Age=32, Salary=12000),
 Row(ID=2, Name='Tommy', Surname='Happy', Age=38, Salary=13000)]

In [16]:
sample_df.columns

['ID', 'Name', 'Surname', 'Age', 'Salary']

In [17]:
sample_df.describe().show()

+-------+------------------+------+-------+------------------+-----------------+
|summary|                ID|  Name|Surname|               Age|           Salary|
+-------+------------------+------+-------+------------------+-----------------+
|  count|                 4|     4|      4|                 4|                4|
|   mean|               2.5|  null|   null|             34.25|          12750.0|
| stddev|1.2909944487358056|  null|   null|3.8622100754188224|1707.825127659933|
|    min|                 1|Joseph| Forest|                30|            11000|
|    max|                 4| Tommy|   Wise|                38|            15000|
+-------+------------------+------+-------+------------------+-----------------+



In [18]:
sample_df.select(['Name','Salary']).show()

+------+------+
|  Name|Salary|
+------+------+
|Joseph| 12000|
| Tommy| 13000|
|Thomas| 11000|
|Strato| 15000|
+------+------+



In [19]:
sample_df.head(2)[0].asDict()["Surname"]

'Forest'

In [20]:
sample_df.withColumn("new_salary", sample_df["Salary"]*1.2).show()

+---+------+-------+---+------+----------+
| ID|  Name|Surname|Age|Salary|new_salary|
+---+------+-------+---+------+----------+
|  1|Joseph| Forest| 32| 12000|   14400.0|
|  2| Tommy|  Happy| 38| 13000|   15600.0|
|  3|Thomas|  Horse| 30| 11000|   13200.0|
|  4|Strato|   Wise| 37| 15000|   18000.0|
+---+------+-------+---+------+----------+



In [21]:
from pyspark.sql.functions import col, lit

In [22]:
sample_df.withColumn("Country",lit("TR")).show()

+---+------+-------+---+------+-------+
| ID|  Name|Surname|Age|Salary|Country|
+---+------+-------+---+------+-------+
|  1|Joseph| Forest| 32| 12000|     TR|
|  2| Tommy|  Happy| 38| 13000|     TR|
|  3|Thomas|  Horse| 30| 11000|     TR|
|  4|Strato|   Wise| 37| 15000|     TR|
+---+------+-------+---+------+-------+



In [23]:
# from pyspark.sql.functions import when

In [24]:
from pyspark.sql.functions import when, col


In [25]:
sample_df.withColumn("Salary Level", when(col("Salary") >12000,lit("High")).otherwise(lit("Normal"))).show()

+---+------+-------+---+------+------------+
| ID|  Name|Surname|Age|Salary|Salary Level|
+---+------+-------+---+------+------------+
|  1|Joseph| Forest| 32| 12000|      Normal|
|  2| Tommy|  Happy| 38| 13000|        High|
|  3|Thomas|  Horse| 30| 11000|      Normal|
|  4|Strato|   Wise| 37| 15000|        High|
+---+------+-------+---+------+------------+



In [26]:
sample_df.withColumnRenamed("ID","Number").show()

+------+------+-------+---+------+
|Number|  Name|Surname|Age|Salary|
+------+------+-------+---+------+
|     1|Joseph| Forest| 32| 12000|
|     2| Tommy|  Happy| 38| 13000|
|     3|Thomas|  Horse| 30| 11000|
|     4|Strato|   Wise| 37| 15000|
+------+------+-------+---+------+



In [27]:
sample_df.drop("ID").show()

+------+-------+---+------+
|  Name|Surname|Age|Salary|
+------+-------+---+------+
|Joseph| Forest| 32| 12000|
| Tommy|  Happy| 38| 13000|
|Thomas|  Horse| 30| 11000|
|Strato|   Wise| 37| 15000|
+------+-------+---+------+



In [31]:
df_collect = sample_df.collect()

In [32]:
df_collect

[Row(ID=1, Name='Joseph', Surname='Forest', Age=32, Salary=12000),
 Row(ID=2, Name='Tommy', Surname='Happy', Age=38, Salary=13000),
 Row(ID=3, Name='Thomas', Surname='Horse', Age=30, Salary=11000),
 Row(ID=4, Name='Strato', Surname='Wise', Age=37, Salary=15000)]

In [34]:
for row in df_collect:
  print(row["Name"] + " " + row["Surname"])

Joseph Forest
Tommy Happy
Thomas Horse
Strato Wise


In [35]:
sample_df.createOrReplaceTempView("personel")

In [36]:
spark.sql("SELECT * FROM personel WHERE Salary>12000")

DataFrame[ID: int, Name: string, Surname: string, Age: int, Salary: int]

In [38]:
query = spark.sql("SELECT * FROM personel WHERE Salary>12000")
query.show()

+---+------+-------+---+------+
| ID|  Name|Surname|Age|Salary|
+---+------+-------+---+------+
|  2| Tommy|  Happy| 38| 13000|
|  4|Strato|   Wise| 37| 15000|
+---+------+-------+---+------+



In [39]:
sample_df.filter("Age<38").show()

+---+------+-------+---+------+
| ID|  Name|Surname|Age|Salary|
+---+------+-------+---+------+
|  1|Joseph| Forest| 32| 12000|
|  3|Thomas|  Horse| 30| 11000|
|  4|Strato|   Wise| 37| 15000|
+---+------+-------+---+------+



In [40]:
sample_df.filter(sample_df["Age"]<38).select(["Name", "Salary"]).show()

+------+------+
|  Name|Salary|
+------+------+
|Joseph| 12000|
|Thomas| 11000|
|Strato| 15000|
+------+------+



In [41]:
sample_df.filter( (sample_df["Age"]<38) & (sample_df["Salary"]<15000) ).select(["Name", "Salary"]).show()

+------+------+
|  Name|Salary|
+------+------+
|Joseph| 12000|
|Thomas| 11000|
+------+------+



In [44]:
sales_df = spark.read.csv("sales.csv",header=True, inferSchema=True)
sales_df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [46]:
sales_df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [47]:
sales_df.groupBy("Company").max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [48]:
sales_df.agg({"sales":"mean"}).show()

+-----------------+
|       avg(sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [49]:
groupdf = sales_df.groupBy("Company")
groupdf.agg({"sales":"max"}).show()

+-------+----------+
|Company|max(sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [50]:
sales_df.groupBy("Company").count().orderBy("count").show() ##  value_counts

+-------+-----+
|Company|count|
+-------+-----+
|     FB|    2|
|   GOOG|    3|
|   MSFT|    3|
|   APPL|    4|
+-------+-----+



In [51]:
from pyspark.sql.functions import countDistinct, avg, stddev, format_number
sales_df.select(avg("sales").alias("Average Sales")).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [52]:
avg_sale = sales_df.select(avg("sales").alias("Average Sales"))
avg_sale.select(format_number("Average Sales",2).alias("Averga Sales")).show()

+------------+
|Averga Sales|
+------------+
|      360.58|
+------------+



In [53]:
sales_df.orderBy("Sales").show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [54]:
sales_df.orderBy(sales_df["Sales"].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [55]:
liste2 = [(1, "Joseph", "Forest", 32, None),
          (2, "Tommy", None,  38, 13000),
          (3, "Thomas", None, None, None),
          (4, "Strato", "Wise", 37 , 15000)]

columns2 = ["ID","Name","Surname","Age","Salary"]

missing_df = spark.createDataFrame(liste2,columns2)

In [56]:
missing_df.show()

+---+------+-------+----+------+
| ID|  Name|Surname| Age|Salary|
+---+------+-------+----+------+
|  1|Joseph| Forest|  32|  null|
|  2| Tommy|   null|  38| 13000|
|  3|Thomas|   null|null|  null|
|  4|Strato|   Wise|  37| 15000|
+---+------+-------+----+------+



In [57]:
missing_df.na.drop().show()

+---+------+-------+---+------+
| ID|  Name|Surname|Age|Salary|
+---+------+-------+---+------+
|  4|Strato|   Wise| 37| 15000|
+---+------+-------+---+------+



In [58]:
missing_df.na.drop(subset=["Salary"]).show()

+---+------+-------+---+------+
| ID|  Name|Surname|Age|Salary|
+---+------+-------+---+------+
|  2| Tommy|   null| 38| 13000|
|  4|Strato|   Wise| 37| 15000|
+---+------+-------+---+------+



In [59]:
missing_df.na.fill("fill value").show()

+---+------+----------+----+------+
| ID|  Name|   Surname| Age|Salary|
+---+------+----------+----+------+
|  1|Joseph|    Forest|  32|  null|
|  2| Tommy|fill value|  38| 13000|
|  3|Thomas|fill value|null|  null|
|  4|Strato|      Wise|  37| 15000|
+---+------+----------+----+------+



In [60]:
missing_df.na.fill(0).show()

+---+------+-------+---+------+
| ID|  Name|Surname|Age|Salary|
+---+------+-------+---+------+
|  1|Joseph| Forest| 32|     0|
|  2| Tommy|   null| 38| 13000|
|  3|Thomas|   null|  0|     0|
|  4|Strato|   Wise| 37| 15000|
+---+------+-------+---+------+



In [61]:
from pyspark.sql.functions import mean
avg_sal = missing_df.select(mean("Salary")).collect()
mean_sal = avg_sal[0][0]

In [62]:
mean_sal

14000.0

In [63]:
missing_df.na.fill(mean_sal, subset="Salary").show()

+---+------+-------+----+------+
| ID|  Name|Surname| Age|Salary|
+---+------+-------+----+------+
|  1|Joseph| Forest|  32| 14000|
|  2| Tommy|   null|  38| 13000|
|  3|Thomas|   null|null| 14000|
|  4|Strato|   Wise|  37| 15000|
+---+------+-------+----+------+



In [64]:
stock_df = spark.read.csv("apple.csv", header=True, inferSchema=True )
stock_df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [65]:
from pyspark.sql.functions import year

In [66]:
stock_df.select(year("Date")).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [59]:
stock_df.withColumn("Year", year("Date")).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
|2010-01-11|212.

In [67]:
stock_df.withColumn("Year", year("Date")).select(["Year", "Close"]).show()

+----+------------------+
|Year|             Close|
+----+------------------+
|2010|        214.009998|
|2010|        214.379993|
|2010|        210.969995|
|2010|            210.58|
|2010|211.98000499999998|
|2010|210.11000299999998|
|2010|        207.720001|
|2010|        210.650002|
|2010|            209.43|
|2010|            205.93|
|2010|        215.039995|
|2010|            211.73|
|2010|        208.069996|
|2010|            197.75|
|2010|        203.070002|
|2010|        205.940001|
|2010|        207.880005|
|2010|        199.289995|
|2010|        192.060003|
|2010|        194.729998|
+----+------------------+
only showing top 20 rows



In [68]:
new_stock = stock_df.withColumn("Year", year("Date")).select(["Year", "Close"])
new_stock

DataFrame[Year: int, Close: double]

In [69]:
new_stock.groupBy("Year").mean().select(["Year", "avg(Close)"]).withColumnRenamed("avg(Close)", "Average Close").show()

+----+------------------+
|Year|     Average Close|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [70]:
new_stock.groupBy("Year").mean().select(["Year", "avg(Close)"]).withColumnRenamed("avg(Close)", "Average Close").orderBy("Year").show()

+----+------------------+
|Year|     Average Close|
+----+------------------+
|2010| 259.8424600000002|
|2011|364.00432532142867|
|2012| 576.0497195640002|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2015|120.03999980555547|
|2016|104.60400786904763|
+----+------------------+

