In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): still running...
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285416 sha256=fd873aacb8365e30f98f87a179fcaa0fe716ced4aee1e88b3db2385018920e0a
  Stored in directory: c:\users\faqih\appdata\local\pip\cache\wheels\07\9f\04\fc2c478c8c87334f0ff48e86fa7d8c2a814f9df5dc21d79b7e
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.1
Note: you may need to restart the kernel to use updated packages.


In [4]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("hellowspark")\
        .getOrCreate()

In [5]:
df = spark.read.csv("titanic.csv", header=True, inferSchema=True)

In [10]:
df.head(2)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C')]

In [11]:
df.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
only showing top 2 rows



In [12]:
# show schema 
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [13]:
# show dataset shape 
print("jumlah baris : ", df.count())
print("jumlah kolom : ", len(df.columns))

jumlah baris :  891
jumlah kolom :  12


## Select

In [14]:
df.select("Name", "Sex", "Age", "Fare").show(5)

+--------------------+------+----+-------+
|                Name|   Sex| Age|   Fare|
+--------------------+------+----+-------+
|Braund, Mr. Owen ...|  male|22.0|   7.25|
|Cumings, Mrs. Joh...|female|38.0|71.2833|
|Heikkinen, Miss. ...|female|26.0|  7.925|
|Futrelle, Mrs. Ja...|female|35.0|   53.1|
|Allen, Mr. Willia...|  male|35.0|   8.05|
+--------------------+------+----+-------+
only showing top 5 rows



In [15]:
# select distinct 
df.select("Sex").distinct().show()

+------+
|   Sex|
+------+
|female|
|  male|
+------+



## Filter 

In [17]:
# show pessanger > 77
df.filter("Age > 77").show()

+-----------+--------+------+--------------------+----+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+------+----+-----+--------+
|        631|       1|     1|Barkworth, Mr. Al...|male|80.0|    0|    0| 27042|30.0|  A23|       S|
+-----------+--------+------+--------------------+----+----+-----+-----+------+----+-----+--------+



## Data aggregation

In [18]:
from pyspark.sql import functions as F

# melihat jumlah penumpang yang survived dan tidak
df.groupBy("Sex").agg(F.count("Sex")).show()

+------+----------+
|   Sex|count(Sex)|
+------+----------+
|female|       314|
|  male|       577|
+------+----------+



In [19]:
# Atau misal ingin melihat total fare berdasarkan kelas penumpang
df.groupBy("Pclass").agg(F.sum("Fare")).show()

+------+------------------+
|Pclass|         sum(Fare)|
+------+------------------+
|     1|18177.412499999984|
|     3| 6714.695100000002|
|     2|3801.8416999999995|
+------+------------------+



## Convert to Pandas 

In [20]:
FareByPclass = df.groupBy("Pclass").agg(F.sum("Fare"))
df_pandas = FareByPclass.toPandas()

print(type(df_pandas))

<class 'pandas.core.frame.DataFrame'>


In [21]:
df_pandas

Unnamed: 0,Pclass,sum(Fare)
0,1,18177.4125
1,3,6714.6951
2,2,3801.8417


## SQL Table

In [22]:
table_name = "titanic"
df.createOrReplaceTempView(table_name)

In [25]:
spark.sql("select * from titanic").show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [26]:
spark.sql("select * from titanic where Age > 77").show()

+-----------+--------+------+--------------------+----+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+------+----+-----+--------+
|        631|       1|     1|Barkworth, Mr. Al...|male|80.0|    0|    0| 27042|30.0|  A23|       S|
+-----------+--------+------+--------------------+----+----+-----+-----+------+----+-----+--------+



In [28]:
spark.sql("select Survived, count(*) as total from titanic group by Survived").show()

+--------+-----+
|Survived|total|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+

