In [3]:
import os
import sys
import pyspark 

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Basic training


In [7]:
from pyspark import SparkContext # type: ignore

In [8]:
sc = SparkContext()

In [4]:
num = list(range(0,10000))

In [5]:
rdd_num = sc.parallelize(num)

In [6]:
rdd_num.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [7]:
sum(num)

49995000

In [8]:
rdd_num.reduce(lambda a,b: a+b)

49995000

In [9]:
rdd_num.reduce(min)

0

In [10]:
filtered_rdd_num =  rdd_num.filter(lambda x : x%2 == 0)

In [11]:
filtered_rdd_num.take(10)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# SQL in Pyspark

## Read JSON

Hàm làm sạch

In [4]:
def clean_json_file(input_path, output_path):
    with open(input_path, 'r') as infile, open(output_path, 'w') as outfile:
        for line in infile:
            # Chỉ ghi các dòng không trống (dòng chứa dữ liệu)
            if line.strip():  # Loại bỏ dòng trống
                outfile.write(line)

In [5]:
from pyspark.sql import SQLContext

In [9]:
sqlContext = SQLContext(sc)



In [10]:
clean_json_file("C:\\Users\\Bao Kun\\Desktop\\PysparkTraining\\sample_users_with_id.json", "cleaned_input.json") #sample_users_with_id

# Đọc tệp JSON vào DataFrame
#df = spark.read.json("")
df = sqlContext.read.option("multiline", "true").json("cleaned_input.json")
df.show()

# Hiển thị các dòng trong DataFrame


# Kiểm tra kiểu dữ liệu của các cột
df.printSchema()


+--------------------+-----------------+--------------+-----------+----------+-------------+--------------------+------------+--------------+--------+--------------------+--------------------+
|          created_at|            email|email_verified|family_name|given_name|      last_ip|          last_login|logins_count|          name|nickname|          updated_at|             user_id|
+--------------------+-----------------+--------------+-----------+----------+-------------+--------------------+------------+--------------+--------+--------------------+--------------------+
|2016-11-28T14:10:...|    test@test.com|          true|       Test|     Hello|94.121.163.63|2016-12-02T01:17:...|          15| test@test.com|    test|2016-12-02T01:17:...|583c3ac3f38e84297...|
|2016-11-28T16:00:...|   test1@test.com|          true|      Test1|    Hello1|94.121.168.53|2016-11-28T16:00:...|           1|test1@test.com|   test1|2016-11-28T16:00:...|583c5484cb79a5fe5...|
|2016-11-28T16:12:...|      aaa@aaa

In [16]:
df.count()

5

In [17]:
df.select(df.created_at).show()

+--------------------+
|          created_at|
+--------------------+
|2016-11-28T14:10:...|
|2016-11-28T16:00:...|
|2016-11-28T16:12:...|
|2016-12-01T23:59:...|
|2016-12-09T12:01:...|
+--------------------+



In [18]:
df.take(10)

[Row(created_at='2016-11-28T14:10:11.338Z', email='test@test.com', email_verified=True, family_name='Test', given_name='Hello', last_ip='94.121.163.63', last_login='2016-12-02T01:17:29.310Z', logins_count=15, name='test@test.com', nickname='test', updated_at='2016-12-02T01:17:29.310Z', user_id='583c3ac3f38e84297c002546'),
 Row(created_at='2016-11-28T16:00:04.209Z', email='test1@test.com', email_verified=True, family_name='Test1', given_name='Hello1', last_ip='94.121.168.53', last_login='2016-11-28T16:00:47.203Z', logins_count=1, name='test1@test.com', nickname='test1', updated_at='2016-11-28T16:00:47.203Z', user_id='583c5484cb79a5fe593425a9'),
 Row(created_at='2016-11-28T16:12:23.777Z', email='aaa@aaa.com', email_verified=True, family_name='Dough', given_name='John', last_ip='94.121.168.53', last_login='2016-11-28T16:12:52.353Z', logins_count=2, name='aaa@aaa.com', nickname='aaa', updated_at='2016-11-28T16:12:52.353Z', user_id='583c57672c7686377d2f66c9'),
 Row(created_at='2016-12-01T23

## Code SQL trong python

In [None]:
df.createOrReplaceTempView("sql_df") # Đăng ký để sử dụng viết code SQL

In [19]:
sqlContext.sql("select * from sql_df").show(10)

+--------------------+-----------------+--------------+-----------+----------+-------------+--------------------+------------+--------------+--------+--------------------+--------------------+
|          created_at|            email|email_verified|family_name|given_name|      last_ip|          last_login|logins_count|          name|nickname|          updated_at|             user_id|
+--------------------+-----------------+--------------+-----------+----------+-------------+--------------------+------------+--------------+--------+--------------------+--------------------+
|2016-11-28T14:10:...|    test@test.com|          true|       Test|     Hello|94.121.163.63|2016-12-02T01:17:...|          15| test@test.com|    test|2016-12-02T01:17:...|583c3ac3f38e84297...|
|2016-11-28T16:00:...|   test1@test.com|          true|      Test1|    Hello1|94.121.168.53|2016-11-28T16:00:...|           1|test1@test.com|   test1|2016-11-28T16:00:...|583c5484cb79a5fe5...|
|2016-11-28T16:12:...|      aaa@aaa

In [16]:
sqlContext.sql('select email from sql_df').show(5)

+-----------------+
|            email|
+-----------------+
|    test@test.com|
|   test1@test.com|
|      aaa@aaa.com|
|          a@a.com|
|test9999@test.com|
+-----------------+



In [20]:
sqlContext.sql("select email, email_verified from sql_df where email_verified = 'false' ").show(10)

+-----------------+--------------+
|            email|email_verified|
+-----------------+--------------+
|test9999@test.com|         false|
+-----------------+--------------+



In [27]:
sc.stop()

## Read CSV

In [2]:
from pyspark.sql import SparkSession

# Khởi tạo Spark session
spark = SparkSession.builder \
    .appName("ReadCSVExample") \
    .getOrCreate()


In [12]:

# Đọc tệp CSV
df = spark.read.csv("C:\\Users\\Bao Kun\\Desktop\\PysparkTraining\\Store_CA.csv", header=True, inferSchema=True)

# Hiển thị dữ liệu
df.show(20)

+--------------+--------------+----------------+---------+------------------+--------+------------------+---------------+-----------------+-------------+-------------+-------------------+
|ProductVariety|MarketingSpend|CustomerFootfall|StoreSize|EmployeeEfficiency|StoreAge|CompetitorDistance|PromotionsCount|EconomicIndicator|StoreLocation|StoreCategory|MonthlySalesRevenue|
+--------------+--------------+----------------+---------+------------------+--------+------------------+---------------+-----------------+-------------+-------------+-------------------+
|           581|            29|            1723|      186|              84.9|       1|                12|              6|            108.3|  Los Angeles|  Electronics|              284.9|
|           382|            31|            1218|      427|              75.8|      18|                11|              6|             97.8|  Los Angeles|  Electronics|             308.21|
|           449|            35|            2654|      142|  

In [4]:
df.printSchema()

root
 |-- ProductVariety: integer (nullable = true)
 |-- MarketingSpend: integer (nullable = true)
 |-- CustomerFootfall: integer (nullable = true)
 |-- StoreSize: integer (nullable = true)
 |-- EmployeeEfficiency: double (nullable = true)
 |-- StoreAge: integer (nullable = true)
 |-- CompetitorDistance: integer (nullable = true)
 |-- PromotionsCount: integer (nullable = true)
 |-- EconomicIndicator: double (nullable = true)
 |-- StoreLocation: string (nullable = true)
 |-- StoreCategory: string (nullable = true)
 |-- MonthlySalesRevenue: double (nullable = true)



In [11]:
df.agg({'StoreSize':'mean','MonthlySalesRevenue':'mean'}).show()

+------------------+------------------------+
|    avg(StoreSize)|avg(MonthlySalesRevenue)|
+------------------+------------------------+
|272.99757575757576|       299.2532848484848|
+------------------+------------------------+



In [22]:
groupByStoreCategory_df = df.groupBy('StoreCategory').agg({'StoreSize':'mean','MonthlySalesRevenue':'mean'})

In [23]:
groupByStoreCategory_df = groupByStoreCategory_df.orderBy('avg(MonthlySalesRevenue)', ascending=False)

In [24]:
groupByStoreCategory_df.show()

+-------------+------------------+------------------------+
|StoreCategory|    avg(StoreSize)|avg(MonthlySalesRevenue)|
+-------------+------------------+------------------------+
|      Grocery|274.22487223168656|       301.3255706984667|
|     Clothing| 276.3806818181818|       300.8631250000001|
|  Electronics| 268.3121495327103|       295.3908037383178|
+-------------+------------------+------------------------+



In [25]:
df.take(10)

[Row(ProductVariety=581, MarketingSpend=29, CustomerFootfall=1723, StoreSize=186, EmployeeEfficiency=84.9, StoreAge=1, CompetitorDistance=12, PromotionsCount=6, EconomicIndicator=108.3, StoreLocation='Los Angeles', StoreCategory='Electronics', MonthlySalesRevenue=284.9),
 Row(ProductVariety=382, MarketingSpend=31, CustomerFootfall=1218, StoreSize=427, EmployeeEfficiency=75.8, StoreAge=18, CompetitorDistance=11, PromotionsCount=6, EconomicIndicator=97.8, StoreLocation='Los Angeles', StoreCategory='Electronics', MonthlySalesRevenue=308.21),
 Row(ProductVariety=449, MarketingSpend=35, CustomerFootfall=2654, StoreSize=142, EmployeeEfficiency=92.8, StoreAge=14, CompetitorDistance=11, PromotionsCount=6, EconomicIndicator=101.1, StoreLocation='Los Angeles', StoreCategory='Grocery', MonthlySalesRevenue=292.11),
 Row(ProductVariety=666, MarketingSpend=9, CustomerFootfall=2591, StoreSize=159, EmployeeEfficiency=66.3, StoreAge=11, CompetitorDistance=11, PromotionsCount=4, EconomicIndicator=115.1,

In [26]:
df.select('StoreCategory').distinct().show()

+-------------+
|StoreCategory|
+-------------+
|      Grocery|
|  Electronics|
|     Clothing|
+-------------+



In [27]:
df.select('ProductVariety', 'MarketingSpend' , 'CustomerFootfall').show()

+--------------+--------------+----------------+
|ProductVariety|MarketingSpend|CustomerFootfall|
+--------------+--------------+----------------+
|           581|            29|            1723|
|           382|            31|            1218|
|           449|            35|            2654|
|           666|             9|            2591|
|           657|            35|            2151|
|           182|            43|            1789|
|           341|            29|            1868|
|           500|            32|             786|
|           656|            30|            1352|
|           419|            48|            1949|
|           851|            10|            1501|
|           458|            26|            1673|
|           432|            18|            1901|
|           555|            40|            2528|
|           587|            37|            1797|
|           637|            46|            1345|
|           510|            17|            1556|
|           586|    

In [28]:
import time 

Đổi tên cột dùng lambda, có thể dùng withColumnRenamed để thay thế

In [31]:
start_time = time.time()

mapDic = { 'Grocery' : 'Tạp hóa', 'Electronics' : 'Đồ điện', 'Clothing' : 'Quần áo'}

data = df.rdd.map(lambda x: x.asDict()).map(
    lambda row: {**row, 'StoreCategory': mapDic.get(row['StoreCategory'], row['StoreCategory'])}
)

print("--- %s seconds ---" % (time.time() - start_time))

--- 0.0009987354278564453 seconds ---


In [33]:
data.take(10)

[{'ProductVariety': 581,
  'MarketingSpend': 29,
  'CustomerFootfall': 1723,
  'StoreSize': 186,
  'EmployeeEfficiency': 84.9,
  'StoreAge': 1,
  'CompetitorDistance': 12,
  'PromotionsCount': 6,
  'EconomicIndicator': 108.3,
  'StoreLocation': 'Los Angeles',
  'StoreCategory': 'Đồ điện',
  'MonthlySalesRevenue': 284.9},
 {'ProductVariety': 382,
  'MarketingSpend': 31,
  'CustomerFootfall': 1218,
  'StoreSize': 427,
  'EmployeeEfficiency': 75.8,
  'StoreAge': 18,
  'CompetitorDistance': 11,
  'PromotionsCount': 6,
  'EconomicIndicator': 97.8,
  'StoreLocation': 'Los Angeles',
  'StoreCategory': 'Đồ điện',
  'MonthlySalesRevenue': 308.21},
 {'ProductVariety': 449,
  'MarketingSpend': 35,
  'CustomerFootfall': 2654,
  'StoreSize': 142,
  'EmployeeEfficiency': 92.8,
  'StoreAge': 14,
  'CompetitorDistance': 11,
  'PromotionsCount': 6,
  'EconomicIndicator': 101.1,
  'StoreLocation': 'Los Angeles',
  'StoreCategory': 'Tạp hóa',
  'MonthlySalesRevenue': 292.11},
 {'ProductVariety': 666,
  '

In [34]:
spark.stop()