In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=83277f59d352eb6d5905f37f43c2cf3aa8404acc3753f97256e4b0b106d346e7
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession


In [None]:
pd.read_csv("/content/sample_data/test1.csv")

Unnamed: 0,Name,age
0,Krish,31
1,Su,30
2,Sunny,29


In [3]:
spark=SparkSession.builder.appName('Practice').getOrCreate()

In [None]:
spark

In [6]:
df_spark=spark.read.csv("/content/sample_data/test1.csv")


In [7]:
df_spark=spark.read.option("header",True).option("inferSchema",True).csv("/content/sample_data/test1.csv")

In [None]:
df_spark.show()

+-----+----+----------+------+
| Name| age|Experience|Salary|
+-----+----+----------+------+
|Krish|  31|        10| 30000|
|   Su|  30|         8| 25000|
|Sunny|  29|         4| 20000|
| Paul|  24|         3| 20000|
|  Har|  21|         1| 15000|
|  Shu|  23|         2| 18000|
|  Mah|NULL|      NULL| 40000|
| NULL|  34|        10| 38000|
| NULL|  36|      NULL|  NULL|
+-----+----+----------+------+



In [None]:
type(df_spark)

In [10]:
# check the schema
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [None]:
df_spark.columns

['Name', 'age', 'Experience']

In [None]:
df_spark.select(["Name","age"])

DataFrame[Name: string, age: int]

In [None]:
indices = [0, 2]
columns_to_select = [df_spark.columns[i] for i in indices]
columns_to_select
df_spark.select(columns_to_select).show()

['Name', 'Experience']

In [9]:
df_spark.describe().show()

+-------+-----+------------------+-----------------+------------------+
|summary| Name|               age|       Experience|            Salary|
+-------+-----+------------------+-----------------+------------------+
|  count|    6|                 6|                6|                 6|
|   mean| NULL|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev| NULL| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|  Har|                21|                1|             15000|
|    max|Sunny|                31|               10|             30000|
+-------+-----+------------------+-----------------+------------------+



In [None]:
# adding the columns
df_spark=df_spark.withColumn('Experience after 2 years',df_spark["Experience"]+2)

In [None]:
df_spark.show()

+-----+---+----------+------------------------+
| Name|age|Experience|Experience after 2 years|
+-----+---+----------+------------------------+
|Krish| 31|        10|                      12|
|   Su| 30|         8|                      10|
|Sunny| 29|         4|                       6|
+-----+---+----------+------------------------+



In [None]:
df_spark.drop(df_spark["Experience after 2 years"])

DataFrame[Name: string, age: int, Experience: int]

In [None]:
# rename the col
df_spark.withColumnRenamed("Name","New Name")

DataFrame[New Name: string, age: int, Experience: int, Experience after 2 years: int]

In [None]:
df_spark.na.drop(thresh=2).show()
# how="any" one of the values are null
# how="all" all the values are null
# thresh=2 counts of non-null which are less than 2 will be droped.

+-----+----+----------+------+
| Name| age|Experience|Salary|
+-----+----+----------+------+
|Krish|  31|        10| 30000|
|   Su|  30|         8| 25000|
|Sunny|  29|         4| 20000|
| Paul|  24|         3| 20000|
|  Har|  21|         1| 15000|
|  Shu|  23|         2| 18000|
|  Mah|NULL|      NULL| 40000|
| NULL|  34|        10| 38000|
+-----+----+----------+------+



In [None]:
## subset
df_spark.na.drop(subset=["Experience"]).show()

+-----+---+----------+------+
| Name|age|Experience|Salary|
+-----+---+----------+------+
|Krish| 31|        10| 30000|
|   Su| 30|         8| 25000|
|Sunny| 29|         4| 20000|
| Paul| 24|         3| 20000|
|  Har| 21|         1| 15000|
|  Shu| 23|         2| 18000|
| NULL| 34|        10| 38000|
+-----+---+----------+------+



In [None]:
## fill na value
## data type is int cant be replaced by string abc
df_spark.na.fill('abc').show()

+-----+----+----------+------+
| Name| age|Experience|Salary|
+-----+----+----------+------+
|Krish|  31|        10| 30000|
|   Su|  30|         8| 25000|
|Sunny|  29|         4| 20000|
| Paul|  24|         3| 20000|
|  Har|  21|         1| 15000|
|  Shu|  23|         2| 18000|
|  Mah|NULL|      NULL| 40000|
|  abc|  34|        10| 38000|
|  abc|  36|      NULL|  NULL|
+-----+----+----------+------+



In [None]:
from pyspark.sql.functions import col, mean
mean_age = df_spark.select(mean(col("age"))).collect()[0][0]

# Fill missing values in the 'age' column with the calculated mean
df_filled = df_spark.fillna({'age': mean_age})


In [None]:
df_filled.show()

+-----+---+----------+------+
| Name|age|Experience|Salary|
+-----+---+----------+------+
|Krish| 31|        10| 30000|
|   Su| 30|         8| 25000|
|Sunny| 29|         4| 20000|
| Paul| 24|         3| 20000|
|  Har| 21|         1| 15000|
|  Shu| 23|         2| 18000|
|  Mah| 28|      NULL| 40000|
| NULL| 34|        10| 38000|
| NULL| 36|      NULL|  NULL|
+-----+---+----------+------+



# Filter

In [13]:
df_spark.filter("Salary<=20000").select(["Name","age"]).show()

+-----+---+
| Name|age|
+-----+---+
|Sunny| 29|
| Paul| 24|
|  Har| 21|
|  Shu| 23|
+-----+---+



In [12]:
df_spark.filter(df_spark["Salary"]<=20000).select(["Name","age"]).show()

+-----+---+
| Name|age|
+-----+---+
|Sunny| 29|
| Paul| 24|
|  Har| 21|
|  Shu| 23|
+-----+---+



In [17]:
df_spark.filter((df_spark.Salary<=20000)&(df_spark.Salary>15000)) .select(["Name","age"]).show()

+-----+---+
| Name|age|
+-----+---+
|Sunny| 29|
| Paul| 24|
|  Shu| 23|
+-----+---+



In [18]:
# 反向选择
df_spark.filter(~(df_spark.Salary<=20000)&(df_spark.Salary>15000)) .select(["Name","age"]).show()

+-----+---+
| Name|age|
+-----+---+
|Krish| 31|
|   Su| 30|
+-----+---+



Pyspark GroupBy And Aggregate Functions

In [19]:
spark=SparkSession.builder.appName('Agg').getOrCreate()

In [26]:
df_pyspark1=spark.read.csv('/content/sample_data/test3.csv',header=True,inferSchema=True)

In [27]:
## Groupby
### Grouped to find the maximum salary
df_pyspark1.groupBy("Name").sum().show()
#df_pyspark1.groupBy("Name").mean().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [28]:
df_pyspark1.groupBy("Name").mean().show()

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



In [29]:
df_pyspark1.groupBy("Name").count().show()

+---------+-----+
|     Name|count|
+---------+-----+
|Sudhanshu|    3|
|    Sunny|    2|
|    Krish|    3|
|   Mahesh|    2|
+---------+-----+



In [30]:
df_pyspark1.groupBy("Name").min().show()

+---------+-----------+
|     Name|min(salary)|
+---------+-----------+
|Sudhanshu|       5000|
|    Sunny|       2000|
|    Krish|       4000|
|   Mahesh|       3000|
+---------+-----------+



In [31]:
df_pyspark1.agg({"Salary":"sum"}).show()
# agg_df = df.groupBy("Department").agg(
#     sum("Salary").alias("TotalSalary"),
#     avg("Salary").alias("AverageSalary"),
#     count("EmployeeName").alias("EmployeeCount"),
#     max("Salary").alias("MaxSalary"),
#     min("Salary").alias("MinSalary")

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [33]:
!git config --global user.name "YZehui"
!git config --global user.email "yzhhelen@gmail.com"


In [45]:
!git clone https://YZehui/YZehui.git



Cloning into 'YZehui'...
fatal: unable to access 'https://YZehui/YZehui.git/': Could not resolve host: YZehui


In [44]:

%cd YZehui


[Errno 2] No such file or directory: 'YZehui'
/content/Practice


In [46]:
!git add .
