In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
import pyspark.sql.functions as func

In [2]:
#tạo một sparkSession
spark = SparkSession.\
    builder.\
        appName("sparksql3").\
        getOrCreate()

In [3]:
mySchema = StructType([\
    StructField("userID",IntegerType(),True),
    StructField("name",StringType(),True),
    StructField("age",IntegerType(),True),
    StructField("friends",IntegerType(),True),
    ])

In [4]:
#đọc file csv
people = spark.read.format('csv').\
schema(mySchema).\
option('inferSchema','False').\
option('path','file:///D:/code/python/spark/fakefriends.csv').\
load()

In [5]:
# xem cấu trúc schema
people.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)



In [10]:
people.show()

+------+--------+---+-------+
|userID|    name|age|friends|
+------+--------+---+-------+
|     0|    Will| 33|    385|
|     1|Jean-Luc| 26|      2|
|     2|    Hugh| 55|    221|
|     3|  Deanna| 40|    465|
|     4|   Quark| 68|     21|
|     5|  Weyoun| 59|    318|
|     6|  Gowron| 37|    220|
|     7|    Will| 54|    307|
|     8|  Jadzia| 38|    380|
|     9|    Hugh| 27|    181|
|    10|     Odo| 53|    191|
|    11|     Ben| 57|    372|
|    12|   Keiko| 54|    253|
|    13|Jean-Luc| 56|    444|
|    14|    Hugh| 43|     49|
|    15|     Rom| 36|     49|
|    16|  Weyoun| 22|    323|
|    17|     Odo| 35|     13|
|    18|Jean-Luc| 45|    455|
|    19|  Geordi| 60|    246|
+------+--------+---+-------+
only showing top 20 rows



In [6]:
output = people.select(people.userID,people.name,people.age,people.friends).\
where(people.age < 30).withColumn('insert_ts',func.current_timestamp()).\
orderBy(people.userID).cache()

In [7]:
output.createOrReplaceTempView("peoples")

In [8]:
spark.sql("SELECT userID,name,age,friends from peoples").show()

+------+--------+---+-------+
|userID|    name|age|friends|
+------+--------+---+-------+
|     1|Jean-Luc| 26|      2|
|     9|    Hugh| 27|    181|
|    16|  Weyoun| 22|    323|
|    21|   Miles| 19|    268|
|    24|  Julian| 25|      1|
|    25|     Ben| 21|    445|
|    26|  Julian| 22|    100|
|    32|     Nog| 26|    281|
|    35| Beverly| 27|    305|
|    46|    Morn| 25|     96|
|    47|   Brunt| 24|     49|
|    48|     Nog| 20|      1|
|    52| Beverly| 19|    269|
|    54|   Brunt| 19|      5|
|    60|  Geordi| 20|    100|
|    66|  Geordi| 21|    477|
|    72|  Kasidy| 22|    179|
|    73|   Brunt| 20|    384|
|    84|     Ben| 28|    311|
|    89|    Worf| 24|    492|
+------+--------+---+-------+
only showing top 20 rows



In [9]:
# error message
output.write \
    .format("csv") \
    .mode("overwrite") \
    .option("path", "file:///D:/code/python/spark/output/op/") \
    .partitionBy("age") \
    .save()
