In [1]:
import pyspark 
from pyspark.sql import SparkSession

In [2]:
spark:SparkSession = SparkSession.builder \
      .appName("Dataframe test") \
      .getOrCreate()   
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/02 19:21:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/02 19:21:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


**Create Dataframe**


In [3]:
#Create df from data 
data = [('Alice', 1, 2001), ('Bob', 2, 2002), ('Bob', 2, 1999)]
df1 = spark.createDataFrame(data, ['name', 'age', 'birth year'])
df1.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----+---+----------+
| name|age|birth year|
+-----+---+----------+
|Alice|  1|      2001|
|  Bob|  2|      2002|
|  Bob|  2|      1999|
+-----+---+----------+



                                                                                

In [4]:
#Create df from rdd

#data
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

rdd = spark.sparkContext.parallelize(data)
print("Data: ")
print(rdd.collect())

print("Schema: ")
dfFromRDD = rdd.toDF(columns)
dfFromRDD.printSchema()
dfFromRDD.show()

Data: 
[('Java', '20000'), ('Python', '100000'), ('Scala', '3000')]
Schema: 
root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [25]:
#Create df with data and schema
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, BooleanType

data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



                                                                                

In [21]:
#Create df by reading csv file
df2 = spark.read.options(header = True, delimiter = ',') \
    .csv("file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode.csv")
df2.show(vertical = True)

-RECORD 0-----------------------------------
 RecordNumber        | 1                    
 Zipcode             | 704                  
 ZipCodeType         | STANDARD             
 City                | PARC PARQUE          
 State               | PR                   
 LocationType        | NOT ACCEPTABLE       
 Lat                 | 17.96                
 Long                | -66.22               
 Xaxis               | 0.38                 
 Yaxis               | -0.87                
 Zaxis               | 0.3                  
 WorldRegion         | NA                   
 Country             | US                   
 LocationText        | Parc Parque, PR      
 Location            | NA-US-PR-PARC PARQUE 
 Decommisioned       | FALSE                
 TaxReturnsFiled     | null                 
 EstimatedPopulation | null                 
 TotalWages          | null                 
 Notes               | null                 
-RECORD 1-----------------------------------
 RecordNum

In [24]:
#Create df by reading all file in folder (cac file phai co cung schema)
df2 = spark.read.options(header = True, delimiter = ',') \
    .csv("file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data")
print(df2.count())
df2.show(vertical = True)

                                                                                

42
-RECORD 0-----------------------------------
 RecordNumber        | 1                    
 Zipcode             | 704                  
 ZipCodeType         | STANDARD             
 City                | PARC PARQUE          
 State               | PR                   
 LocationType        | NOT ACCEPTABLE       
 Lat                 | 17.96                
 Long                | -66.22               
 Xaxis               | 0.38                 
 Yaxis               | -0.87                
 Zaxis               | 0.3                  
 WorldRegion         | NA                   
 Country             | US                   
 LocationText        | Parc Parque, PR      
 Location            | NA-US-PR-PARC PARQUE 
 Decommisioned       | FALSE                
 TaxReturnsFiled     | null                 
 EstimatedPopulation | null                 
 TotalWages          | null                 
 Notes               | null                 
-RECORD 1-----------------------------------
 Record

In [29]:
#Voi 1 schema cu the
schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data")

df_with_schema.show(vertical=True)

22/08/02 19:42:52 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
-RECORD 0----------------------------------
 RecordNumber        | 1                   
 Zipcode             | 704                 
 City                | STANDARD            
 State               | PARC PARQUE         
 Lat                 | null                
 Long                | null                
 WorldRegion         | 17.96               
 Country             | -66.22              
 LocationText        | 0.38                
 Location            | -0.87               
 EstimatedPopulation | null                
 Notes               | NA                  
-RECORD 1----------------------------------
 RecordNumber        | 2                   
 Zipcode             | 704                 
 City                | STANDARD            
 Sta

**Save Dataframe**

In [32]:
#overwrite – mode is used to overwrite the existing file.
#append – To add the data to the existing file.
#ignore – Ignores write operation when the file already exists.
#error – This is a default option when the file already exists, it returns an error.

df_with_schema.write.option("header",True).mode("append") \
 .csv("file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/save_zipcode")


22/08/02 19:47:59 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode2.csv
22/08/02 19:47:59 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv


**Transformations**

In [36]:
#data 
print(f"Number of records {df_with_schema.count()}") #Ham count la action
df_with_schema.show(vertical=True)

Number of records 42


                                                                                

22/08/02 19:53:27 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
-RECORD 0----------------------------------
 RecordNumber        | 1                   
 Zipcode             | 704                 
 City                | STANDARD            
 State               | PARC PARQUE         
 Lat                 | null                
 Long                | null                
 WorldRegion         | 17.96               
 Country             | -66.22              
 LocationText        | 0.38                
 Location            | -0.87               
 EstimatedPopulation | null                
 Notes               | NA                  
-RECORD 1----------------------------------
 RecordNumber        | 2                   
 Zipcode             | 704                 
 City                | STANDARD            
 Sta

In [38]:
#Select 
selectColDf = df_with_schema.select("Zipcode", "City")
selectColDf.show()

[Stage 56:>                                                         (0 + 0) / 1]

22/08/02 19:55:18 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Zipcode, ZipCodeType
 Schema: Zipcode, City
Expected: City but found: ZipCodeType
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
+-------+--------+
|Zipcode|    City|
+-------+--------+
|    704|STANDARD|
|    704|STANDARD|
|    709|STANDARD|
|  76166|  UNIQUE|
|  76177|STANDARD|
|  76177|STANDARD|
|    704|STANDARD|
|  85209|STANDARD|
|  85210|STANDARD|
|  32046|STANDARD|
|  34445|  PO BOX|
|  32564|STANDARD|
|  34487|  PO BOX|
|    708|STANDARD|
|    704|STANDARD|
|  36275|  PO BOX|
|  35146|STANDARD|
|  35585|STANDARD|
|  27007|STANDARD|
|  27203|STANDARD|
+-------+--------+
only showing top 20 rows



                                                                                

In [75]:
#drop 
dropedDf = df_with_schema.drop("Lat", "Long", "WorldRegion", "LocationText", "Notes")
dropedDf.show()

22/08/02 21:28:53 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: RecordNumber, Zipcode, ZipCodeType, City, Long, Yaxis, Zaxis
 Schema: RecordNumber, Zipcode, City, State, Country, Location, EstimatedPopulation
Expected: City but found: ZipCodeType
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
+------------+-------+--------+-------------------+-------+--------+-------------------+
|RecordNumber|Zipcode|    City|              State|Country|Location|EstimatedPopulation|
+------------+-------+--------+-------------------+-------+--------+-------------------+
|           1|    704|STANDARD|        PARC PARQUE| -66.22|   -0.87|               null|
|           2|    704|STANDARD|PASEO COSTA DEL SUR| -66.22|   -0.87|               null|
|          10|    709|STANDARD|       BDA SAN LUIS| -66.26|   -0.86|               null|
|       61391|  76166|  UNIQUE|  CINGULAR WIRELESS| -97.31|   -0.83|               null|
|       6

In [56]:
#Filter 3 cach deu dc
from pyspark.sql.functions import col
filteredDf = dropedDf.filter(col("Zipcode") > 30000) \
                     .filter("RecordNumber >= 50000") \
                     .filter(dropedDf.City == "STANDARD")
filteredDf.show()

#ham where y het

22/08/02 20:08:12 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: RecordNumber, Zipcode, ZipCodeType, City, Long, Yaxis, Zaxis
 Schema: RecordNumber, Zipcode, City, State, Country, Location, EstimatedPopulation
Expected: City but found: ZipCodeType
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
22/08/02 20:08:12 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: RecordNumber, Zipcode, ZipCodeType, City, Long, Yaxis, Zaxis
 Schema: RecordNumber, Zipcode, City, State, Country, Location, EstimatedPopulation
Expected: City but found: ZipCodeType
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode2.csv
+------------+-------+--------+-----------+-------+--------+-------------------+
|RecordNumber|Zipcode|    City|      State|Country|Location|EstimatedPopulation|
+------------+-------+--------+-----------+-------+--------+-------------------+
|       61392|  7617

In [57]:
#distinct Loai bo cac dong bi trung lap - Noi chung luc dau la 2 file y het nhau dc load vao, h loai trung lap la con 1 nua 
distinctDf = df_with_schema.distinct()
print(f"Number of records {distinctDf.count()}") #Ham count la action
distinctDf.show()

22/08/02 20:10:03 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
22/08/02 20:10:03 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode2.csv


                                                                                

Number of records 21
22/08/02 20:10:06 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode2.csv
22/08/02 20:10:06 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
+------------+-------+--------+-------------------+----+----+-----------+-------+------------+--------+-------------------+-----+
|RecordNumber|Zipcode|    City|              State| Lat|Long|WorldRegion|Country|LocationText|Location|EstimatedPopulation|Notes|
+------------+-------+--------+-------------------+----+----+-----------+-------+------------+--------+-------------------+-----+
|       49348|  34487|  PO BOX|          HOMOSASSA|null|null|      28.78| -82.61|        0.1

In [78]:
#Sort theo mot cot nao do
sortedDf = distinctDf.sort(distinctDf.Zipcode.desc())
sortedDf.show()

[Stage 102:>                                                        (0 + 2) / 2]

22/08/02 21:33:28 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
22/08/02 21:33:28 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode2.csv


[Stage 104:>                                                        (0 + 1) / 1]

+------------+-------+--------+-----------------+----+----+-----------+-------+------------+--------+-------------------+-----+
|RecordNumber|Zipcode|    City|            State| Lat|Long|WorldRegion|Country|LocationText|Location|EstimatedPopulation|Notes|
+------------+-------+--------+-----------------+----+----+-----------+-------+------------+--------+-------------------+-----+
|       39828|  85210|STANDARD|             MESA|null|null|      33.38|-111.84|       -0.31|   -0.77|               null|   NA|
|       39827|  85209|STANDARD|             MESA|null|null|      33.37|-111.64|        -0.3|   -0.77|               null|   NA|
|       61392|  76177|STANDARD|       FORT WORTH|null|null|      32.75| -97.33|        -0.1|   -0.83|               null|   NA|
|       61393|  76177|STANDARD|         FT WORTH|null|null|      32.75| -97.33|        -0.1|   -0.83|               null|   NA|
|       61391|  76166|  UNIQUE|CINGULAR WIRELESS|null|null|      32.72| -97.31|        -0.1|   -0.83|   

                                                                                

**Action**

In [60]:
#describe
describeDf = distinctDf.describe()
describeDf.show(vertical=True)

22/08/02 20:13:31 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode.csv
22/08/02 20:13:31 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 20, schema size: 12
CSV file: file:///home/iloveu/BK_bat_diet/20212/BigData/spark_test/zipcode_data/zipcode2.csv
-RECORD 0----------------------------------
 summary             | count               
 RecordNumber        | 21                  
 Zipcode             | 21                  
 City                | 21                  
 State               | 21                  
 Lat                 | 0                   
 Long                | 0                   
 WorldRegion         | 21                  
 Country             | 21                  
 LocationText        | 21                  
 Location          

In [72]:
#Explore data
print("Zipcode information: ")
describeData = describeDf.collect()
for data in describeData:
    print(f"{data.__getitem__('summary')} : {data.__getitem__('Zipcode')}")

Zipcode information: 
count : 21
mean : 34530.19047619047
stddev : 29480.87406204071
min : 704
max : 85210
