In [25]:
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('pyspark') \
    .getOrCreate()
# 原始数据 
test = spark.createDataFrame([('001','1',100,87,67,83,98), ('002','2',87,81,90,83,83), ('003','3',86,91,83,89,63),
                            ('004','2',65,87,94,73,88), ('005','1',76,62,89,81,98), ('006','3',84,82,85,73,99),
                            ('007','3',56,76,63,72,87), ('008','1',55,62,46,78,71), ('009','2',63,72,87,98,64)],
                             ['number','class','language','math','english','physic','chemical'])
test.show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   002|    2|      87|  81|     90|    83|      83|
|   003|    3|      86|  91|     83|    89|      63|
|   004|    2|      65|  87|     94|    73|      88|
|   005|    1|      76|  62|     89|    81|      98|
|   006|    3|      84|  82|     85|    73|      99|
|   007|    3|      56|  76|     63|    72|      87|
|   008|    1|      55|  62|     46|    78|      71|
|   009|    2|      63|  72|     87|    98|      64|
+------+-----+--------+----+-------+------+--------+



任务1：PySpark数据处理

    步骤1：使用Python链接Spark环境
    步骤2：创建dateframe数据
    步骤3：用spark执行以下逻辑：找到数据行数、列数
    步骤4：用spark筛选class为1的样本
    步骤5：用spark筛选language >90 或 math> 90的样本

任务2：PySpark数据统计

    步骤1：读取文件https://cdn.coggle.club/Pokemon.csv
    步骤2：将读取的进行保存，表头也需要保存
    步骤3：分析每列的类型，取值个数
    步骤4：分析每列是否包含缺失值

In [26]:
from pyspark import SparkFiles
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
df = spark.read.csv("file://"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'Sp Atk')
df = df.withColumnRenamed('Sp. Def', 'Sp Def')

In [27]:
df.columns

['Name',
 'Type 1',
 'Type 2',
 'Total',
 'HP',
 'Attack',
 'Defense',
 'Sp Atk',
 'Sp Def',
 'Speed',
 'Generation',
 'Legendary']

In [28]:
df.describe

<bound method DataFrame.describe of DataFrame[Name: string, Type 1: string, Type 2: string, Total: int, HP: int, Attack: int, Defense: int, Sp Atk: int, Sp Def: int, Speed: int, Generation: int, Legendary: boolean]>

In [29]:
df.count

<bound method DataFrame.count of DataFrame[Name: string, Type 1: string, Type 2: string, Total: int, HP: int, Attack: int, Defense: int, Sp Atk: int, Sp Def: int, Speed: int, Generation: int, Legendary: boolean]>

In [30]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [31]:
# spark is an existing SparkSession
df = spark.read.json("./people.json")
# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [32]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [33]:
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [34]:
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [35]:
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [36]:
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [37]:
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [38]:
df.createGlobalTempView("people")

In [39]:
spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [40]:
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [41]:
from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("./people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin

Name: Justin


In [42]:
from pyspark.sql.types import StringType, StructType, StructField

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("./people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [44]:
df = spark.read.load("./users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

AnalysisException: path file:/home/googler/Desktop/AI Study/spark/namesAndFavColors.parquet already exists.

In [45]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [46]:
df = spark.read.load("./people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

AnalysisException: path file:/home/googler/Desktop/AI Study/spark/namesAndAges.parquet already exists.

In [47]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [48]:
df = spark.read.load("./people.csv",
                     format="csv", sep=";", inferSchema="true", header="true")

In [49]:
df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



In [50]:
df = spark.read.orc("./users.orc")
(df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .option("orc.column.encoding.direct", "name")
    .save("users_with_options.orc"))

AnalysisException: path file:/home/googler/Desktop/AI Study/spark/users_with_options.orc already exists.

In [51]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [34]:
df = spark.read.parquet("./users.parquet")
(df.write.format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet"))

In [35]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [36]:
df = spark.sql("SELECT * FROM parquet.`./users.parquet`")

In [37]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [38]:
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

AnalysisException: sort column age is not defined in table people_bucketed, defined table columns are: name, favorite_color, favorite_numbers

In [39]:
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

In [40]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [41]:
df = spark.read.parquet("./users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("users_partitioned_bucketed"))

In [44]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [46]:
# enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df = spark.read.parquet("./dir1/",
                                     "./dir1/dir2/")
test_corrupt_df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
|file2.parquet|
+-------------+



In [48]:
df = spark.read.load("./dir1",
                     format="parquet", pathGlobFilter="*.parquet")
df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
+-------------+



In [49]:
recursive_loaded_df = spark.read.format("parquet")\
    .option("recursiveFileLookup", "true")\
    .load("./dir1")
recursive_loaded_df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
|file2.parquet|
+-------------+



In [50]:
# Only load files modified before 07/1/2050 @ 08:30:00
df = spark.read.load("./dir1",
                     format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
+-------------+



In [53]:
peopleDF = spark.read.json("./people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")

# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()

+------+
|  name|
+------+
|Justin|
+------+



In [54]:
from pyspark.sql import Row

# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

root
 |-- single: long (nullable = true)
 |-- double: long (nullable = true)
 |-- triple: long (nullable = true)
 |-- key: integer (nullable = true)



In [52]:
 #spark is from the previous example.
sc = spark.sparkContext

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = "./people.json"
peopleDF = spark.read.json(path)

# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# |  name|
# +------+
# |Justin|
# +------+

# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+------+
|  name|
+------+
|Justin|
+------+

+----------------+----+
|         address|name|
+----------------+----+
|{Columbus, Ohio}| Yin|
+----------------+----+



In [72]:
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SHOW COLUMNS IN people")
# SHOW COLUMNS IN customer
sqlDF.count()

3

In [75]:
sqlDF.show()

+----------------+
|        col_name|
+----------------+
|            name|
|  favorite_color|
|favorite_numbers|
+----------------+



In [53]:
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [24]:
# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "./people.csv"

df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# Read a csv with delimiter, the default delimiter is ","
# df2 = spark.read.option(delimiter=';').csv(path)
# df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)

# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "/home/googler/Desktop/AIStudy/spark/"
df5 = spark.read.csv(folderPath)
df5.show()

AttributeError: module 'pyspark' has no attribute 'sparkContext'

In [69]:
from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()

AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);
'CreateTable `default`.`src`, Ignore


In [19]:
from pyspark import SparkContext as sc

In [20]:
from pyspark.sql.functions import *

In [22]:
from pyspark import SparkContext, SparkConf