In [None]:
from pyspark.sql import SparkSession

# set up jdbc driver for connect database
spark = SparkSession.builder.config('spark.jars.packages', 'org.xerial:sqlite-jdbc:3.46.0.0').getOrCreate()

In [77]:
df = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', dbtable='bank',url='jdbc:sqlite:/workspace/bank.db').load()

In [78]:
df.show()

+--------+-------------+---------+-------+-----------------+
|    Date|       Domain| Location|  Value|Transaction_count|
+--------+-------------+---------+-------+-----------------+
|1/1/2022|    RESTRAUNT|     Bhuj| 365554|             1932|
|1/1/2022|  INVESTMENTS| Ludhiana| 847444|             1721|
|1/1/2022|       RETAIL|      Goa| 786941|             1573|
|1/1/2022|INTERNATIONAL|  Mathura| 368610|             2049|
|1/1/2022|    RESTRAUNT|  Madurai| 615681|             1519|
|1/1/2022|INTERNATIONAL|    Daman|1191092|             1813|
|1/1/2022|INTERNATIONAL|    Buxar| 968883|             2098|
|1/1/2022|       PUBLIC|   Trichy|1030297|              606|
|1/1/2022|    RESTRAUNT|    Kullu| 688655|             1463|
|1/1/2022|      MEDICAL|Hyderabad|1174302|             1463|
|1/1/2022|       PUBLIC|  Lucknow| 912902|             1035|
|1/1/2022|  INVESTMENTS|  Bikaner| 436534|             1093|
|1/1/2022|       PUBLIC| Amritsar| 849803|             2013|
|1/1/2022|  INVESTMENTS|

In [79]:
df.count()

1004480

In [80]:
# Create operation
from pyspark.sql import Row

# Create a new row
new_row = Row(Date='2/1/2023', Domain='INTERNATIONAL', Location='Bombay', Value=552250, Transaction_count=2345)

# Convert the Row into a DataFrame
new_df = spark.createDataFrame([new_row], schema=df.schema)

# Append the new DataFrame to the existing DataFrame
df = df.union(new_df)

# Show the updated DataFrame
df.tail(10)

                                                                                

[Row(Date='12/31/2022', Domain='EDUCATION', Location='Konark', Value=338091, Transaction_count=2084),
 Row(Date='12/31/2022', Domain='MEDICAL', Location='Buxar', Value=501769, Transaction_count=2015),
 Row(Date='12/31/2022', Domain='EDUCATION', Location='Bidar', Value=1139388, Transaction_count=2085),
 Row(Date='12/31/2022', Domain='RETAIL', Location='Varanasi', Value=709220, Transaction_count=1309),
 Row(Date='12/31/2022', Domain='INTERNATIONAL', Location='Konark', Value=738557, Transaction_count=754),
 Row(Date='12/31/2022', Domain='INVESTMENTS', Location='Pune', Value=668180, Transaction_count=1572),
 Row(Date='12/31/2022', Domain='RETAIL', Location='Durg', Value=1000349, Transaction_count=2155),
 Row(Date='12/31/2022', Domain='INTERNATIONAL', Location='Mathura', Value=1026547, Transaction_count=2458),
 Row(Date='12/31/2022', Domain='RETAIL', Location='Patiala', Value=763159, Transaction_count=992),
 Row(Date='2/1/2023', Domain='INTERNATIONAL', Location='Bombay', Value=552250, Trans

In [83]:
# Read operation
filtered_df = df.filter(df.Transaction_count > 2000)

# Search the number of cities has more than 2000 transactions
filtered_df.show()
print("Số thành phố có trên 2000 giao dịch: ", filtered_df.count())

+--------+-------------+---------+-------+-----------------+
|    Date|       Domain| Location|  Value|Transaction_count|
+--------+-------------+---------+-------+-----------------+
|1/1/2022|INTERNATIONAL|  Mathura| 368610|             2049|
|1/1/2022|INTERNATIONAL|    Buxar| 968883|             2098|
|1/1/2022|       PUBLIC| Amritsar| 849803|             2013|
|1/1/2022|  INVESTMENTS|  Mathura|1180043|             2068|
|1/1/2022|       RETAIL|  Lunglei| 521003|             2304|
|1/1/2022|  INVESTMENTS|   Kannur| 770080|             2431|
|1/1/2022|       RETAIL| Amritsar| 907177|             2042|
|1/1/2022|       RETAIL|    Bhind| 555766|             2035|
|1/1/2022|    RESTRAUNT|   Bhopal|1155932|             2082|
|1/1/2022|      MEDICAL|Hyderabad| 753346|             2427|
|1/1/2022|    EDUCATION| Banglore| 379817|             2196|
|1/1/2022|    EDUCATION| Amritsar| 799260|             2430|
|1/1/2022|  INVESTMENTS|   Trichy|1074381|             2467|
|1/1/2022|      MEDICAL|



255861


                                                                                

In [82]:
# Update operation
from pyspark.sql.functions import when

# Update the last row column with Value = 500000 and Transaction_count = 2000
df = df.withColumn('Value', when((df.Date == '2/1/2023') & 
                                 (df.Domain == 'INTERNATIONAL') & 
                                 (df.Location == 'Bombay'), 50000).otherwise(df.Value)) \
               .withColumn('Transaction_count', when((df.Date == '2/1/2023') & 
                                                     (df.Domain == 'INTERNATIONAL') & 
                                                     (df.Location == 'Bombay'), 2000).otherwise(df.Transaction_count))

# Show the updated DataFrame
df.tail(10)

                                                                                

[Row(Date='12/31/2022', Domain='EDUCATION', Location='Konark', Value=338091, Transaction_count=2084),
 Row(Date='12/31/2022', Domain='MEDICAL', Location='Buxar', Value=501769, Transaction_count=2015),
 Row(Date='12/31/2022', Domain='EDUCATION', Location='Bidar', Value=1139388, Transaction_count=2085),
 Row(Date='12/31/2022', Domain='RETAIL', Location='Varanasi', Value=709220, Transaction_count=1309),
 Row(Date='12/31/2022', Domain='INTERNATIONAL', Location='Konark', Value=738557, Transaction_count=754),
 Row(Date='12/31/2022', Domain='INVESTMENTS', Location='Pune', Value=668180, Transaction_count=1572),
 Row(Date='12/31/2022', Domain='RETAIL', Location='Durg', Value=1000349, Transaction_count=2155),
 Row(Date='12/31/2022', Domain='INTERNATIONAL', Location='Mathura', Value=1026547, Transaction_count=2458),
 Row(Date='12/31/2022', Domain='RETAIL', Location='Patiala', Value=763159, Transaction_count=992),
 Row(Date='2/1/2023', Domain='INTERNATIONAL', Location='Bombay', Value=50000, Transa

In [47]:
df.head(5)

[Row(Date='1/1/2022', Domain='RESTRAUNT', Location='Bhuj', Value=365554, Transaction_count=1932),
 Row(Date='1/1/2022', Domain='INVESTMENTS', Location='Ludhiana', Value=847444, Transaction_count=1721),
 Row(Date='1/1/2022', Domain='RETAIL', Location='Goa', Value=786941, Transaction_count=1573),
 Row(Date='1/1/2022', Domain='INTERNATIONAL', Location='Mathura', Value=368610, Transaction_count=2049),
 Row(Date='1/1/2022', Domain='RESTRAUNT', Location='Madurai', Value=615681, Transaction_count=1519),
 Row(Date='1/1/2022', Domain='INTERNATIONAL', Location='Daman', Value=1191092, Transaction_count=1813),
 Row(Date='1/1/2022', Domain='INTERNATIONAL', Location='Buxar', Value=968883, Transaction_count=2098),
 Row(Date='1/1/2022', Domain='PUBLIC', Location='Trichy', Value=1030297, Transaction_count=606),
 Row(Date='1/1/2022', Domain='RESTRAUNT', Location='Kullu', Value=688655, Transaction_count=1463),
 Row(Date='1/1/2022', Domain='MEDICAL', Location='Hyderabad', Value=1174302, Transaction_count=

In [49]:
# Delete Operation
df = df.filter(~((df.Date == '1/1/2022') & 
                 (df.Domain == 'RESTRAUNT') & 
                 (df.Location == 'Bhuj') & 
                 (df.Value == 365554) & 
                 (df.Transaction_count == 1932))) # Try delete the first row

df.head(5)

+--------+-------------+---------+-------+-----------------+
|    Date|       Domain| Location|  Value|Transaction_count|
+--------+-------------+---------+-------+-----------------+
|1/1/2022|  INVESTMENTS| Ludhiana| 847444|             1721|
|1/1/2022|       RETAIL|      Goa| 786941|             1573|
|1/1/2022|INTERNATIONAL|  Mathura| 368610|             2049|
|1/1/2022|    RESTRAUNT|  Madurai| 615681|             1519|
|1/1/2022|INTERNATIONAL|    Daman|1191092|             1813|
|1/1/2022|INTERNATIONAL|    Buxar| 968883|             2098|
|1/1/2022|       PUBLIC|   Trichy|1030297|              606|
|1/1/2022|    RESTRAUNT|    Kullu| 688655|             1463|
|1/1/2022|      MEDICAL|Hyderabad|1174302|             1463|
|1/1/2022|       PUBLIC|  Lucknow| 912902|             1035|
|1/1/2022|  INVESTMENTS|  Bikaner| 436534|             1093|
|1/1/2022|       PUBLIC| Amritsar| 849803|             2013|
|1/1/2022|  INVESTMENTS|  Mathura|1180043|             2068|
|1/1/2022|       RETAIL|

In [92]:
%%time
df.createOrReplaceTempView("bank")
spark.sql("SELECT * FROM bank").show(10)

+--------+-------------+---------+-------+-----------------+
|    Date|       Domain| Location|  Value|Transaction_count|
+--------+-------------+---------+-------+-----------------+
|1/1/2022|INTERNATIONAL|  Mathura| 368610|             2049|
|1/1/2022|INTERNATIONAL|    Buxar| 968883|             2098|
|1/1/2022|       PUBLIC| Amritsar| 849803|             2013|
|1/1/2022|  INVESTMENTS|  Mathura|1180043|             2068|
|1/1/2022|       RETAIL|  Lunglei| 521003|             2304|
|1/1/2022|  INVESTMENTS|   Kannur| 770080|             2431|
|1/1/2022|       RETAIL| Amritsar| 907177|             2042|
|1/1/2022|       RETAIL|    Bhind| 555766|             2035|
|1/1/2022|    RESTRAUNT|   Bhopal|1155932|             2082|
|1/1/2022|      MEDICAL|Hyderabad| 753346|             2427|
|1/1/2022|    EDUCATION| Banglore| 379817|             2196|
|1/1/2022|    EDUCATION| Amritsar| 799260|             2430|
|1/1/2022|  INVESTMENTS|   Trichy|1074381|             2467|
|1/1/2022|      MEDICAL|

In [93]:
%%time
spark.sql("SELECT * FROM bank WHERE Transaction_count > 2000").show(10)

+--------+-------------+---------+-------+-----------------+
|    Date|       Domain| Location|  Value|Transaction_count|
+--------+-------------+---------+-------+-----------------+
|1/1/2022|INTERNATIONAL|  Mathura| 368610|             2049|
|1/1/2022|INTERNATIONAL|    Buxar| 968883|             2098|
|1/1/2022|       PUBLIC| Amritsar| 849803|             2013|
|1/1/2022|  INVESTMENTS|  Mathura|1180043|             2068|
|1/1/2022|       RETAIL|  Lunglei| 521003|             2304|
|1/1/2022|  INVESTMENTS|   Kannur| 770080|             2431|
|1/1/2022|       RETAIL| Amritsar| 907177|             2042|
|1/1/2022|       RETAIL|    Bhind| 555766|             2035|
|1/1/2022|    RESTRAUNT|   Bhopal|1155932|             2082|
|1/1/2022|      MEDICAL|Hyderabad| 753346|             2427|
|1/1/2022|    EDUCATION| Banglore| 379817|             2196|
|1/1/2022|    EDUCATION| Amritsar| 799260|             2430|
|1/1/2022|  INVESTMENTS|   Trichy|1074381|             2467|
|1/1/2022|      MEDICAL|

In [94]:
%%time
spark.sql("SELECT Location, SUM(Value) as TotalValue FROM bank WHERE value > 2000 GROUP BY Location").show(10)



+---------+----------+
| Location|TotalValue|
+---------+----------+
|    Bhind|4235606990|
|    Daman|4172465801|
|   Konark|4131943527|
|    Ajmer|4182528511|
|    Akola|4044166341|
| Tirumala|4159849965|
|     Bhuj|4201359304|
|     Durg|4215933267|
|  Lucknow|4188089855|
|   Kochin|4185575218|
| Varanasi|4280956160|
|      Ara|4133335636|
|   Ranchi|4092574745|
|      Goa|4308827172|
|Ahmedabad|4193216509|
|   Kannur|4289790701|
|    Bidar|4168515545|
|      Mon|4196838877|
|  Kolkata|4214515994|
| Ludhiana|4153784833|
+---------+----------+
only showing top 20 rows

CPU times: user 12.4 ms, sys: 0 ns, total: 12.4 ms
Wall time: 3.51 s


                                                                                

In [96]:
%%time
spark.sql("SELECT Location, SUM(Value) as TotalValue FROM bank WHERE value > 2000 GROUP BY Location HAVING TotalValue > 4200000000").show(10)



+--------+----------+
|Location|TotalValue|
+--------+----------+
|   Bhind|4235606990|
|    Bhuj|4201359304|
|    Durg|4215933267|
|Varanasi|4280956160|
|     Goa|4308827172|
|  Kannur|4289790701|
| Kolkata|4214515994|
|Amritsar|4221405601|
|    Doda|4215708322|
| Vellore|4278822726|
| Patiala|4226165828|
|   Buxar|4205631132|
| Lunglei|4217838252|
+--------+----------+

CPU times: user 1.4 ms, sys: 11.8 ms, total: 13.2 ms
Wall time: 3.63 s


                                                                                