In [61]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as F
sc = SparkContext('local')
spark = SparkSession(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local) created by __init__ at <ipython-input-1-0ab1766d4534>:4 

In [63]:
spark

# dataframe处理

## 1.1 用spark进行数据处理

### 规定导入数据的模式，相当于dataframe中提前规定好各个列的类型:

In [64]:
from pyspark.sql.types import *
peopleSchema = StructType([
    # Define the name field  
    StructField('name', StringType(), True),
    # Add the age field  
    StructField('age', IntegerType(), True),
    # Add the city field  
    StructField('city', StringType(), True)  
])

### Read CSV file containing data:

In [65]:
people_df = spark.read.format('csv').load(name='/spark_data/rawdata.csv', schema=peopleSchema)

In [71]:
people_df.head()

### Practice1.1: Defining a schema:

Creating a defined schema helps with data quality and import performance. As mentioned during the lesson, we'll create a simple schema to read in the following columns:

    Name
    Age
    City

The Name and City columns are StringType() and the Age column is an IntegerType().

In [72]:
from pyspark.sql.types import *
people_schema = StructType([
    StructField('name',StringType(),False),
    StructField('age',IntegerType(),False),
    StructField('city',StringType(),False)
])
peopledf1 = spark.read.format('csv').load(name='/spark_data/rawdata.csv',schema = people_schema)

In [73]:
peopledf1.head()

## 1.2 不可变性和懒惰计算

### 不可变性的例子

* Define a new data frame:

In [74]:
voter_df = spark.read.csv('spark_data/DallasCouncilVoters.csv',header=True) # 这种读取csv的方法比上面的要简单，更符合python的特点
voter_df.head(5)

[Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Jennifer S. Gates', year='2017'),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Philip T. Kingston', year='2017'),
 Row(DATE='02/08/2017', TITLE='Mayor', VOTER_NAME='Michael S. Rawlings', year='2017'),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Adam Medrano', year='2017'),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Casey Thomas', year=None)]

* Making changes:

In [76]:
voter_df = voter_df.withColumn('fullyear',     
                               voter_df.year + 2000) # 添加新列
voter_df.head(5)
#voter_df = voter_df.drop(voter_df.year)

[Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Jennifer S. Gates', year='2017', fullyear=4017.0),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Philip T. Kingston', year='2017', fullyear=4017.0),
 Row(DATE='02/08/2017', TITLE='Mayor', VOTER_NAME='Michael S. Rawlings', year='2017', fullyear=4017.0),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Adam Medrano', year='2017', fullyear=4017.0),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Casey Thomas', year=None, fullyear=None)]

### Lazy Processing

In [77]:
voter_df = voter_df.withColumn('fullyear',voter_df.year + 2000)
voter_df = voter_df.drop(voter_df.year)
voter_df.count()

44637

### Practice1.2: Using lazy processing
For this exercise, we'll be defining a Data Frame (aa_dfw_df) and add a couple transformations. Note the amount of time required for the transformations to complete when defined vs when the data is actually queried. These differences may be short, but they will be noticeable. When working with a full Spark cluster with larger quantities of data the difference will be more apparent.

In [79]:
# Load the CSV file
aa_dfw_df = spark.read.csv('./spark_data/AA_DFW_2018_Departures_Short.csv.gz',header = True)

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2018|         0005|                          498|    hnl|
|       01/01/2018|         0007|                          501|    ogg|
|       01/01/2018|         0043|                            0|    dtw|
|       01/01/2018|         0051|                          100|    stl|
|       01/01/2018|         0075|                          147|    dca|
|       01/01/2018|         0096|                           92|    stl|
|       01/01/2018|         0103|                          227|    sjc|
|       01/01/2018|         0119|                          517|    ogg|
|       01/01/2018|         0123|                          489|    hnl|
|       01/01/2018|         0128|                          141|    mco|
|       01/01/2018|         0132|                          201| 

## 1.3 理解 parquet：就是储存的一种形式，用spark处理这种数据的话速度可能更快

### Working with parquet files:

* Reading Parquet files

 写的两种方式

df.write.format('parquet').save('filename.parquet')
df.write.parquet('filename.parquet')

In [80]:
df = spark.read.csv('spark_data/DallasCouncilVoters.csv',header=True)
df.write.parquet("df.parquet")

In [81]:
df.head()

Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Jennifer S. Gates', year='2017')

### Parquet and SQL:

In [82]:
flight_df = spark.read.parquet('df.parquet')
flight_df.createOrReplaceTempView('flights')
short_flights_df = spark.sql('SELECT * FROM flights WHERE year = 2017')

In [83]:
short_flights_df.head(5)

[Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Jennifer S. Gates', year='2017'),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Philip T. Kingston', year='2017'),
 Row(DATE='02/08/2017', TITLE='Mayor', VOTER_NAME='Michael S. Rawlings', year='2017'),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Adam Medrano', year='2017'),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Casey Thomas"', year='2017')]

### Practice1.31: Saving a DataFrame in Parquet format
In this exercise, we're going to practice creating a new Parquet file and then process some data from it.

The spark object and the df1 and df2 DataFrames have been setup for you.

In [84]:
# View the row count of df1 and df2
df1 = spark.read.csv('spark_data/AA_DFW_2017_Departures_Short.csv',header=True)
df2 = spark.read.csv('spark_data/AA_DFW_2016_Departures_Short.csv',header=True)
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

df1 Count: 139358
df2 Count: 140604


In [85]:
df3.head()

Row(Date (MM/DD/YYYY)='01/01/2017', Flight Number='0005', Destination Airport='HNL', Actual elapsed time (Minutes)='537')

In [86]:
# spark 中的dataframe的列名重命名
df3=df3.withColumnRenamed(df3.columns[0],'Date').withColumnRenamed(df3.columns[1],'Flight_number').withColumnRenamed(df3.columns[2],'Destination_port').withColumnRenamed(df3.columns[3],'elapsed_time_minutes')

In [87]:
df3

DataFrame[Date: string, Flight_number: string, Destination_port: string, elapsed_time_minutes: string]

In [88]:
# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())

279962


### Practice1.32: SQL and Parquet
For this example, we're going to read in the Parquet file we created in the last exercise and register it as a SQL table. Once registered, we'll run a quick query against the table (aka, the Parquet file).

The spark object and the AA_DFW_ALL.parquet file are available for you automatically.

In [89]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('./AA_DFW_ALL.parquet',header=True)

# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(elapsed_time_minutes) from flights').collect()[0] # 用collect获取到sql中提取到的信息
print('The average flight time is: %d minutes' % avg_duration)

The average flight time is: 151 minutes


# 第二部分：Spark中dataframe的操作

## 2.1 DataFrame 列操作

### Dataframe refresher

In [90]:
# Return rows where name starts with "M" 
voter_df.filter(voter_df.VOTER_NAME.like('M%'))
# Return name and position only
voters = voter_df.select('VOTER_NAME', 'DATE')

In [94]:
#练习使用sql
voter_df.createOrReplaceTempView('voters')
voters_num = spark.sql("select count(*) from voters where Date = '02/08/2017'").collect()

In [106]:
type(voters)

pyspark.sql.dataframe.DataFrame

### Common dataframe transformations

* Filter / Where

In [107]:
voter_df.filter(voter_df.DATE > '1/1/2018') # or voter_df.where(...)

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, fullyear: double]

In [111]:
voter_df.filter(voter_df.VOTER_NAME.like("A%"))

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, fullyear: double]

* Select

In [108]:
voter_df.select(voter_df.VOTER_NAME)

DataFrame[VOTER_NAME: string]

In [109]:
#或者用另一种方法
voter_df.select("VOTER_NAME")

DataFrame[VOTER_NAME: string]

* withColumn

In [112]:
voter_df.withColumn('year', F.year(F.date_format(voter_df.DATE,'dd/MM/yyyy'))) #添加新列

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, fullyear: double, year: int]

* drop

In [113]:
voter_df.drop('unused_column')

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, fullyear: double]

* rename

In [116]:
voter_df = voter_df.withColumnRenamed('fullyear',"full_year")
voter_df

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, full_year: double]

### Filtering data

In [47]:
voter_df.filter(voter_df['VOTER_NAME'].isNotNull())
voter_df.filter(F.year(F.date_format(voter_df.DATE,'dd/MM/yyyy')) > 1800)
voter_df.where(voter_df['TITLE'].contains('mayor'))
voter_df.where(~ voter_df.TITLE.isNull())

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, fullyear: double]

### Column string transformations

* Contained in pyspark.sql.functions

In [117]:
import pyspark.sql.functions as F

* Applied per column as transformation

In [120]:
voter_df.withColumn('upper', F.upper('VOTER_NAME')) #voter_df本身没有发生变化，只是新生成了个新对象

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, full_year: double, upper: string]

In [121]:
voter_df

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, full_year: double]

* Can create intermediary columns

In [122]:
voter_df1 = voter_df.withColumn('splits', F.split('VOTER_NAME',' '))
voter_df1.head(5)

[Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Jennifer S. Gates', full_year=4017.0, splits=['Jennifer', 'S.', 'Gates']),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Philip T. Kingston', full_year=4017.0, splits=['Philip', 'T.', 'Kingston']),
 Row(DATE='02/08/2017', TITLE='Mayor', VOTER_NAME='Michael S. Rawlings', full_year=4017.0, splits=['Michael', 'S.', 'Rawlings']),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Adam Medrano', full_year=4017.0, splits=['Adam', 'Medrano']),
 Row(DATE='02/08/2017', TITLE='Councilmember', VOTER_NAME='Casey Thomas', full_year=None, splits=['Casey', 'Thomas'])]

* Can cast to other types

In [125]:
voter_df.withColumn('year', voter_df['full_year'].cast(IntegerType())) #改变数据类型

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, full_year: double, year: int]

### Practice2.11: Filtering column content with Python
You've looked at using various operations on DataFrame columns - now you can modify a real dataset. The DataFrame voter_df contains information regarding the voters on the Dallas City Council from the past few years. This truncated DataFrame contains the date of the vote being cast and the name and position of the voter. Your manager has asked you to clean this data so it can later be integrated into some desired reports. The primary task is to remove any null entries or odd characters and return a specific set of voters where you can validate their information.

In [126]:
# Show the distinct VOTER_NAME entries
voter_df.select(voter_df['VOTER_NAME']).distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(5, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|VOTER_NAME                                                                                                                                                                                                                                                                                                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
+-------------------+
only showing top 5 rows



### Practice2.12: Modifying DataFrame columns
Previously, you filtered out any rows that didn't conform to something generally resembling a name. Now based on your earlier work, your manager has asked you to create two new columns - first_name and last_name. She asks you to split the VOTER_NAME column into words on any space character. You'll treat the last word as the last_name, and all other words as the first_name. You'll be using some new functions in this exercise including .split(), .size(), and .getItem(). The .getItem(index) takes an integer value to return the appropriately numbered item in the column. The functions .split() and .size() are in the pyspark.sql.functions library.

In [127]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0)) # 提取对应列表中的第一个值

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show() # show作为展示的函数

+----------+-------------+-------------------+---------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|full_year|first_name|last_name|
+----------+-------------+-------------------+---------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|   4017.0|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|   4017.0|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|   4017.0|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|   4017.0|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     null|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|     null|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|     null|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|     null|        B.|  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|     null|       Lee| Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|     null|     San

## 2.2 Conditional DataFrame column operations

### Conditional example

In [128]:
# .when(<if condition>, <then x>)
df = spark.read.csv('./spark_data/example_data2.2.csv',header=True)
df.select(df.Name, df.Age, F.when(df.Age >= 18, "Adult"))

AnalysisException: Path does not exist: file:/home/u2020100177/spark_data/example_data2.2.csv;

### Another example

In [24]:
# Multiple .when()
df.select(df.Name, df.Age, 
        F.when(df.Age >= 18, "Adult")
        .when(df.Age < 18, "Minor"))

DataFrame[Name: string, Age: string, CASE WHEN (Age >= 18) THEN Adult WHEN (Age < 18) THEN Minor END: string]

### Otherwise

In [25]:
# .otherwise() is like else
df.select(df.Name, df.Age,
        F.when(df.Age >= 18, "Adult")
        .otherwise("Minor"))

DataFrame[Name: string, Age: string, CASE WHEN (Age >= 18) THEN Adult ELSE Minor END: string]

### Practice2.21: when() example
The when() clause lets you conditionally modify a Data Frame based on its content. You'll want to modify our voter_df DataFrame to add a random number to any voting member that is defined as a "Councilmember".

The voter_df DataFrame is defined and available to you. The pyspark.sql.functions library is available as F. You can use F.rand() to generate the random value.

In [26]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand()))
# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

+----------+-------------+-------------------+--------+----------+---------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|fullyear|first_name|last_name|          random_val|
+----------+-------------+-------------------+--------+----------+---------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  4017.0|  Jennifer|    Gates| 0.20874034947022668|
|02/08/2017|Councilmember| Philip T. Kingston|  4017.0|    Philip| Kingston|  0.9799904274449358|
|02/08/2017|        Mayor|Michael S. Rawlings|  4017.0|   Michael| Rawlings|                null|
|02/08/2017|Councilmember|       Adam Medrano|  4017.0|      Adam|  Medrano|  0.4259884690119301|
|02/08/2017|Councilmember|       Casey Thomas|    null|     Casey|   Thomas|  0.2852335617708246|
|02/08/2017|Councilmember|Carolyn King Arnold|    null|   Carolyn|   Arnold|  0.3143523605472355|
|02/08/2017|Councilmember|       Scott Griggs|    null|     Scott|   Griggs|  0.7511832745410593|
|02/08/2017|Councilm

### Practice2.22: When / Otherwise
This requirement is similar to the last, but now you want to add multiple values based on the voter's position. Modify your voter_df DataFrame to add a random number to any voting member that is defined as a Councilmember. Use 2 for the Mayor and 0 for anything other position.

In [27]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

+----------+-------------+-------------------+--------+----------+---------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|fullyear|first_name|last_name|         random_val|
+----------+-------------+-------------------+--------+----------+---------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  4017.0|  Jennifer|    Gates| 0.5331693319303494|
|02/08/2017|Councilmember| Philip T. Kingston|  4017.0|    Philip| Kingston|  0.742954133135259|
|02/08/2017|        Mayor|Michael S. Rawlings|  4017.0|   Michael| Rawlings|                2.0|
|02/08/2017|Councilmember|       Adam Medrano|  4017.0|      Adam|  Medrano|0.42359456000443174|
|02/08/2017|Councilmember|       Casey Thomas|    null|     Casey|   Thomas| 0.7781923096370825|
|02/08/2017|Councilmember|Carolyn King Arnold|    null|   Carolyn|   Arnold| 0.5128597088090691|
|02/08/2017|Councilmember|       Scott Griggs|    null|     Scott|   Griggs|  0.799667122371296|
|02/08/2017|Councilmember|   B

## 2.3 User defined functions

### Reverse string UDF

* Dene a Python method

In [129]:
def reverseString(mystr):
    return mystr[::-1]

* Wrap the function and store as a variable

In [161]:
udfReverseString = F.udf(reverseString, returnType = StringType())

* Use with Spark

In [151]:
user_df = spark.read.csv('./spark_data/DallasCouncilVoters.csv',header=True)
user_df.show()

+----------+-------------+-------------------+----+
|      DATE|        TITLE|         VOTER_NAME|year|
+----------+-------------+-------------------+----+
|02/08/2017|Councilmember|  Jennifer S. Gates|2017|
|02/08/2017|Councilmember| Philip T. Kingston|2017|
|02/08/2017|        Mayor|Michael S. Rawlings|2017|
|02/08/2017|Councilmember|       Adam Medrano|2017|
|02/08/2017|Councilmember|       Casey Thomas|null|
|02/08/2017|Councilmember|Carolyn King Arnold|null|
|02/08/2017|Councilmember|       Scott Griggs|null|
|02/08/2017|Councilmember|   B. Adam  McGough|null|
|02/08/2017|Councilmember|       Lee Kleinman|null|
|02/08/2017|Councilmember|      Sandy Greyson|null|
|02/08/2017|Councilmember|  Jennifer S. Gates|null|
|02/08/2017|Councilmember| Philip T. Kingston|null|
|02/08/2017|        Mayor|Michael S. Rawlings|null|
|02/08/2017|Councilmember|       Adam Medrano|null|
|02/08/2017|Councilmember|      Casey Thomas"|2017|
|02/08/2017|Councilmember|Carolyn King Arnold|2017|
|02/08/2017|

In [164]:
user_df = user_df.withColumn('ReverseName',
                udfReverseString(user_df.Name)) #udf 在使用上面有些问题

AttributeError: 'DataFrame' object has no attribute 'Name'

In [152]:
#练习一下F函数中的split函数，提取First name和last name
user_df = user_df.withColumn('splits',F.split('VOTER_NAME'," "))
user_df = user_df.withColumn("First_name",user_df.splits.getItem(0))
user_df = user_df.withColumn("Last_name",user_df.splits.getItem(F.size(user_df.splits)-1))
user_df = user_df.drop("splits")
# user_df = user_df.withColumn("Reversed_name",udfReverseString(user_df.First_name))
user_df.show()

+----------+-------------+-------------------+----+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|year|First_name|Last_name|
+----------+-------------+-------------------+----+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|2017|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|2017|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|2017|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|2017|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|null|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|null|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|null|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|null|        B.|  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|null|       Lee| Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|null|     Sandy|  Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|null|

In [163]:
(udfReverseString(user_df['First_name'])).getItem(2)

Column<b'reverseString(First_name)[2]'>

False

### Argument-less example

In [31]:
def sortingCap():
    return random.choice(['G','H','R','S'])
udfSortingCap = F.udf(sortingCap, StringType())
user_df = user_df.withColumn('Class', udfSortingCap())

### Practice2.3: Using user defined functions in Spark
For this exercise, we'll use our voter_df DataFrame, but you're going to replace the first_name column with the first and middle names.

In [165]:
voter_df = voter_df.withColumn('splits', F.split(voter_df['VOTER_NAME'], '\s+'))

def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df.show()

+----------+-------------+-------------------+---------+----------+---------+--------------------+---------------------+
|      DATE|        TITLE|         VOTER_NAME|full_year|first_name|last_name|              splits|first_and_middle_name|
+----------+-------------+-------------------+---------+----------+---------+--------------------+---------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|   4017.0|  Jennifer|    Gates|[Jennifer, S., Ga...|          Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston|   4017.0|    Philip| Kingston|[Philip, T., King...|            Philip T.|
|02/08/2017|        Mayor|Michael S. Rawlings|   4017.0|   Michael| Rawlings|[Michael, S., Raw...|           Michael S.|
|02/08/2017|Councilmember|       Adam Medrano|   4017.0|      Adam|  Medrano|     [Adam, Medrano]|                 Adam|
|02/08/2017|Councilmember|       Casey Thomas|     null|     Casey|   Thomas|     [Casey, Thomas]|                Casey|
|02/08/2017|Councilmember|Caroly

## 2.4 Partitioning and lazy processing

### Practice2.41: Adding an ID Field

When working with data, you sometimes only want to access certain fields and perform various operations. In this case, find all the unique voter names from the DataFrame and add a unique ID number. Remember that Spark IDs are assigned based on the DataFrame partition - as such the ID values may be much greater than the actual number of rows in the DataFrame.

With Spark's lazy processing, the IDs are not actually generated until an action is performed and can be somewhat random depending on the size of the dataset.

The spark session and a Spark DataFrame df containing the DallasCouncilVotes.csv.gz file are available in your workspace. The pyspark.sql.functions library is available under the alias F.

In [173]:
import os
os.listdir()
# ?os

['.bash_logout',
 '.ssh',
 '.profile',
 '.viminfo',
 'test_for_glh.ipynb',
 '.config',
 '.gtkrc-2.0',
 '.mozilla',
 '.bash_history',
 'AA_DFW_ALL.parquet',
 '.cache',
 '.local',
 '.dbus',
 'df.parquet',
 '.bash_profile',
 '.Xauthority',
 '.scala_history',
 '.ipython',
 'spark_data',
 'DataCleaning_Section1-4(1).ipynb',
 '.ipynb_checkpoints',
 '.bashrc',
 'AA_DFW_2018_Departures_Short.csv.gz',
 'nohup.out']

In [174]:
# Select all the unique council voters
voter_df = spark.read.csv('spark_data/DallasCouncilVoters.csv',header=True)
voter_df = voter_df.select(voter_df['VOTER_NAME']).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)


There are 39 rows in the voter_df DataFrame.

+--------------------+-------------+
|          VOTER_NAME|       ROW_ID|
+--------------------+-------------+
|        Lee Kleinman|1709396983808|
|  the  final  201...|1700807049217|
|         Erik Wilson|1700807049216|
|  the  final   20...|1683627180032|
| Carolyn King Arnold|1632087572480|
| Rickey D.  Callahan|1597727834112|
|   the   final  2...|1443109011456|
|    Monica R. Alonzo|1382979469312|
|     Lee M. Kleinman|1228360646656|
|   Jennifer S. Gates|1194000908288|
+--------------------+-------------+
only showing top 10 rows



### Practice2.42: IDs with different partitions

You've just completed adding an ID field to a DataFrame. Now, take a look at what happens when you do the same thing on DataFrames containing a different number of partitions.

To check the number of partitions, use the method .rdd.getNumPartitions() on a DataFrame.

In [34]:
voter_df_single = voter_df.select('VOTER_NAME')
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df_single = voter_df_single.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)


There are 200 partitions in the voter_df DataFrame.


There are 200 partitions in the voter_df_single DataFrame.

+--------------------+-------------+
|          VOTER_NAME|       ROW_ID|
+--------------------+-------------+
|        Lee Kleinman|1709396983808|
|  the  final  201...|1700807049217|
|         Erik Wilson|1700807049216|
|  the  final   20...|1683627180032|
| Carolyn King Arnold|1632087572480|
| Rickey D.  Callahan|1597727834112|
|   the   final  2...|1443109011456|
|    Monica R. Alonzo|1382979469312|
|     Lee M. Kleinman|1228360646656|
|   Jennifer S. Gates|1194000908288|
+--------------------+-------------+
only showing top 10 rows

+--------------------+-------------+
|          VOTER_NAME|       ROW_ID|
+--------------------+-------------+
|        Lee Kleinman|1709396983808|
|  the  final  201...|1700807049217|
|         Erik Wilson|1700807049216|
|  the  final   20...|1683627180032|
| Carolyn King Arnold|1632087572480|
| Rickey D.  Callahan|1597727834112|
|   the 

### Practice2.43: More ID tricks

Once you define a Spark process, you'll likely want to use it many times. Depending on your needs, you may want to start your IDs at a certain value so there isn't overlap with previous runs of the Spark task. This behavior is similar to how IDs would behave in a relational database. You have been given the task to make sure that the IDs output from a monthly Spark task start at the highest value from the previous month.

In [35]:
voter_df = spark.read.csv('./spark_data/DallasCouncilVotes.csv',header=True)
voter_df_march = voter_df.select(voter_df.VOTER_NAME,F.when(voter_df.DATE=='03/01/2018',''))
voter_df_march = voter_df_march.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df_april = voter_df.select(voter_df.VOTER_NAME,F.when(voter_df.DATE=='04/11/2018',''))

In [36]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

+------+
|ROW_ID|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows

+------+
|ROW_ID|
+------+
| 44624|
| 44625|
| 44626|
| 44627|
| 44628|
| 44629|
| 44630|
| 44631|
| 44632|
| 44633|
| 44634|
| 44635|
| 44636|
| 44637|
| 44638|
| 44639|
| 44640|
| 44641|
| 44642|
| 44643|
+------+
only showing top 20 rows



## 视频3.1 缓存

In [37]:
#在行动前调用.cache()
voter_df = spark.read.csv('voter_data.txt.gz',header=True)
voter_df.cache().count()

44625

In [38]:
voter_df = voter_df.withColumn('ID',voter_df.DATE+1)
voter_df = voter_df.cache()
voter_df.show()

+----------+-------------+-------------------+----+
|      DATE|        TITLE|         VOTER_NAME|  ID|
+----------+-------------+-------------------+----+
|02/08/2017|Councilmember|  Jennifer S. Gates|null|
|02/08/2017|Councilmember| Philip T. Kingston|null|
|02/08/2017|        Mayor|Michael S. Rawlings|null|
|02/08/2017|Councilmember|       Adam Medrano|null|
|02/08/2017|Councilmember|       Casey Thomas|null|
|02/08/2017|Councilmember|Carolyn King Arnold|null|
|02/08/2017|Councilmember|       Scott Griggs|null|
|02/08/2017|Councilmember|   B. Adam  McGough|null|
|02/08/2017|Councilmember|       Lee Kleinman|null|
|02/08/2017|Councilmember|      Sandy Greyson|null|
|02/08/2017|Councilmember|  Jennifer S. Gates|null|
|02/08/2017|Councilmember| Philip T. Kingston|null|
|02/08/2017|        Mayor|Michael S. Rawlings|null|
|02/08/2017|Councilmember|       Adam Medrano|null|
|02/08/2017|Councilmember|       Casey Thomas|null|
|02/08/2017|Councilmember|Carolyn King Arnold|null|
|02/08/2017|

In [39]:
#通过is_cached检查缓存状态
print(voter_df.is_cached)

True


In [40]:
#调用unpersist()释放缓存
voter_df.unpersist()

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, ID: double]

## 练习3.1 Caching a DataFrame 答案

In [41]:
import time
departures_df=spark.read.csv( "AA_DFW_2017.csv.gz")
#csv:AA_DFW_2017.csv.gz
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))


Counting 139359 rows took 3.661939 seconds
Counting 139359 rows again took 0.881640 seconds


## 练习3.2 Removing a DataFrame from cache

In [42]:
#csv:AA_DFW_2017.csv.gz
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)


Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


## 视频3.2 How to split objects

In [43]:
#Use OS utilities/scripts(split,cut,awk)
#split -l 10000 -d largefile chunk-
#Use custom scripts Write out to Parquet
df_csv = spark.read.csv('AA_DFW_2014_Departures_Short.csv')
df_csv.write.parquet('data.parquet',mode='overwrite')
df = spark.read.parquet('data.parquet')

## 练习3.3 File import performance

File import performance
You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

You have two types of files available: departures_full.txt.gz and departures_xxx.txt.gz where xxx is 000 - 013. The same number of rows is split between each file.

In [44]:
"""departures_000.txt.gz  departures_005.txt.gz  departures_010.txt.gz
departures_001.txt.gz  departures_006.txt.gz  departures_011.txt.gz
departures_002.txt.gz  departures_007.txt.gz  departures_012.txt.gz
departures_003.txt.gz  departures_008.txt.gz  departures_013.txt.gz
departures_004.txt.gz  departures_009.txt.gz  departures_full.txt.gz
"""
# File import performance
#You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

#You have two types of files available: departures_full.txt.gz and departures_xxx.txt.gz where xxx is 000 - 013. The same number of rows is split between each file.
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
#split_df = spark.read.csv('departures_0*.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

#start_time_b = time.time()
#print("Total rows in split DataFrame:\t%d" % split_df.count())
#print("Time to run: %f" % (time.time() - start_time_b))

Total rows in full DataFrame:	139359
Time to run: 0.163822


## 练习3.4 Reading Spark configurations

In [54]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Name: pyspark-shell
Driver TCP port: 41135
Number of partitions: 200


## 练习3.5 Writing Spark configurations

In [46]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

Partition count before change: 200
Partition count after change: 500


## 视频3.4 performance improvements

## 练习3.7 Normal joins

In [57]:
# Join the flights_df and aiports_df DataFrames
flights_df=spark.read.csv('spark_data/AA_DFW_2018_Departures_Short.csv.gz',header=True)
airports_df=spark.read.csv('spark_data/airportnames.txt.gz',header=True)
normal_df = flights_df.join(airports_df, \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan
normal_df.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [Destination Airport#587], [IATA#610], Inner, BuildRight
:- *(2) Project [Date (MM/DD/YYYY)#585, Flight Number#586, Destination Airport#587, Actual elapsed time (Minutes)#588]
:  +- *(2) Filter isnotnull(Destination Airport#587)
:     +- FileScan csv [Date (MM/DD/YYYY)#585,Flight Number#586,Destination Airport#587,Actual elapsed time (Minutes)#588] Batched: false, DataFilters: [isnotnull(Destination Airport#587)], Format: CSV, Location: InMemoryFileIndex[file:/home/u2020100177/spark_data/AA_DFW_2018_Departures_Short.csv.gz], PartitionFilters: [], PushedFilters: [IsNotNull(Destination Airport)], ReadSchema: struct<Date (MM/DD/YYYY):string,Flight Number:string,Destination Airport:string,Actual elapsed ti...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])), [id=#677]
   +- *(1) Project [AIRPORTNAME#609, IATA#610]
      +- *(1) Filter isnotnull(IATA#610)
         +- FileScan csv [AIRPORTNAME#609,IATA#610] Batched: fal

## 练习3.8 Using broadcasting on Spark joins

In [58]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [Destination Airport#587], [IATA#610], Inner, BuildRight
:- *(2) Project [Date (MM/DD/YYYY)#585, Flight Number#586, Destination Airport#587, Actual elapsed time (Minutes)#588]
:  +- *(2) Filter isnotnull(Destination Airport#587)
:     +- FileScan csv [Date (MM/DD/YYYY)#585,Flight Number#586,Destination Airport#587,Actual elapsed time (Minutes)#588] Batched: false, DataFilters: [isnotnull(Destination Airport#587)], Format: CSV, Location: InMemoryFileIndex[file:/home/u2020100177/spark_data/AA_DFW_2018_Departures_Short.csv.gz], PartitionFilters: [], PushedFilters: [IsNotNull(Destination Airport)], ReadSchema: struct<Date (MM/DD/YYYY):string,Flight Number:string,Destination Airport:string,Actual elapsed ti...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])), [id=#709]
   +- *(1) Project [AIRPORTNAME#609, IATA#610]
      +- *(1) Filter isnotnull(IATA#610)
         +- FileScan csv [AIRPORTNAME#609,IATA#610] Batched: fal

## 练习3.9 Comparing broadcast vs normal joins

In [60]:
import time
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

Normal count:		119910	duration: 21.268620
Broadcast count:	119910	duration: 16.830747


使用广播功能可以提高查询效率

## 视频4.1 Introduction to data pipelines

## 练习4.1 Quick pipeline

In [50]:
# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

## 视频4.2 Data handling techniques

In [51]:
df1 = spark.read.csv('AA_DFW_2017.csv.gz', comment='#')
df1 = spark.read.csv('AA_DFW_2017.csv.gz', header='True')
df1 = spark.read.csv('AA_DFW_2017.csv.gz', sep=',')
df1 = spark.read.csv('AA_DFW_2017.csv.gz', sep='*')

## 练习4.2 Removing commented lines

In [4]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')

def col(col_name):
    return annotations_df[col_name]

full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

Full count: 32794
Comment count: 1416
Remaining count: 31378


## 练习4.3 Removing invalid rows

In [5]:
import pyspark.sql.functions as F
initial_count=31378
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df["colcount"] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

Initial count: 31378
Final count: 20580


## 练习4.4 Splitting into columns

In [6]:
import pyspark.sql.functions as F
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df["_c0"], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

## 练习4.5 Further parsing

In [7]:
from pyspark.sql.types import ArrayType,StringType
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')

## 视频4.3 Data validation

## 练习4.6 Validate rows via join

In [52]:
import pyspark.sql.functions as F
valid_folders_df= spark.read.csv("spark_data/valid_folders.txt")
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

NameError: name 'split_df' is not defined

## 练习4.7 Examining invalid rows

In [15]:
import pyspark.sql.functions as F
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

 split_df:	32794
 joined_df:	20945
 invalid_df: 	11849
11223 distinct invalid folders found


## 视频4.4 Final analysis and delivery

## 练习4.8 Dog parsing

In [16]:
from pyspark.sql.types import ArrayType,StringType,StructType,StructField,IntegerType
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

+----------------------------------+
|dog_list                          |
+----------------------------------+
|[affenpinscher,0,9,173,298]       |
|[Border_terrier,73,127,341,335]   |
|[kuvasz,0,0,499,327]              |
|[Great_Pyrenees,124,225,403,374]  |
|[schipperke,146,29,416,309]       |
|[groenendael,168,0,469,374]       |
|[Bedlington_terrier,10,12,462,332]|
|[Lhasa,39,1,499,373]              |
|[Kerry_blue_terrier,17,16,300,482]|
|[vizsla,112,93,276,236]           |
+----------------------------------+
only showing top 10 rows

None


## 练习4.9 Per image count

In [17]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

+----------+
|size(dogs)|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 10 rows



## 练习4.10 Percentage dog pixels

In [18]:
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
  totalpixels = 0
  for dog in doglist:
    totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)

+--------+---------------+-----+------+--------------------+----------+-----------------+
|  folder|       filename|width|height|                dogs|dog_pixels|      dog_percent|
+--------+---------------+-----+------+--------------------+----------+-----------------+
|02110627|n02110627_12938|  200|   300|[[affenpinscher, ...|     49997|83.32833333333333|
|02104029|   n02104029_63|  500|   375|[[kuvasz, 0, 0, 4...|    163173|          87.0256|
|02105056| n02105056_2834|  500|   375|[[groenendael, 16...|    112574|60.03946666666666|
|02093647|  n02093647_541|  500|   333|[[Bedlington_terr...|    144640|86.87087087087087|
|02098413| n02098413_1355|  500|   375|[[Lhasa, 39, 1, 4...|    171120|           91.264|
|02093859| n02093859_2309|  330|   500|[[Kerry_blue_terr...|    131878|79.92606060606062|
|02109961| n02109961_1017|  475|   500|[[Eskimo_dog, 43,...|    189189|79.65852631578947|
|02108000| n02108000_3491|  600|   450|[[EntleBucher, 30...|    168667|62.46925925925926|
|02085782|

TypeError: get() missing 1 required positional argument: 'key'