# Spark Schema

In [1]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

pplSchema = StructType([
    # 이름 필드 정의
    StructField('name', StringType(), True), 
    
    # 나이 필드 정의
    StructField('name', IntegerType(), True), 
    
    # 도시 필드 정의
    StructField('city', StringType(), True), 
])

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("MLSampleTutorial").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x00000133B35001C0>


In [3]:
import pyspark.sql.functions as F

# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('data/AA_DFW_2014_Departures_Short.csv.gz')

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2014|         0005|                          519|    hnl|
|       01/01/2014|         0007|                          505|    ogg|
|       01/01/2014|         0035|                          174|    slc|
|       01/01/2014|         0043|                          153|    dtw|
|       01/01/2014|         0052|                          137|    pit|
|       01/01/2014|         0058|                          174|    san|
|       01/01/2014|         0060|                          155|    mia|
|       01/01/2014|         0064|                          185|    jfk|
|       01/01/2014|         0090|                          126|    ord|
|       01/01/2014|         0096|                           91|    stl|
|       01/01/2014|         0099|                          182| 

- parquet으로 저장함. 
- apache pyarrow install
`pip install pyarrow'
- 가상의 parquet 파일을 생성한다. 

In [4]:
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import pyarrow as pa

df = pd.DataFrame({'one': [-1, 100, 2.5], 
                   'two': ['foo', 'bar', 'baz'],
                   'three': [True, False, True]}, index=list('abc'))

table = pa.Table.from_pandas(df)
pq.write_table(table, 'data/example.parquet')

print(df)
table

     one  two  three
a   -1.0  foo   True
b  100.0  bar  False
c    2.5  baz   True


pyarrow.Table
one: double
two: string
three: bool
__index_level_0__: string
----
one: [[-1,100,2.5]]
two: [["foo","bar","baz"]]
three: [[true,false,true]]
__index_level_0__: [["a","b","c"]]

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("MLSampleTutorial").getOrCreate()
spark.catalog.listTables('default')
spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/C:/Users/GREEN/Desktop/pyspark_tutorial/section_02_data_cleaning/spark-warehouse')]

In [6]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [7]:
spark.catalog.currentDatabase()

'default'

In [8]:
spark.read.parquet('data/example.parquet').show()

+-----+---+-----+-----------------+
|  one|two|three|__index_level_0__|
+-----+---+-----+-----------------+
| -1.0|foo| true|                a|
|100.0|bar|false|                b|
|  2.5|baz| true|                c|
+-----+---+-----+-----------------+



In [9]:
example_df = spark.read.parquet('data/example.parquet')

# Register the temp table
example_df.createOrReplaceTempView('example')

# Run a SQL query of the average flight duration
avg_val= spark.sql('SELECT avg(one) from example').collect()[0]
print('The average value is: %d' % avg_val)

The average value is: 33


In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("dataCleansing").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x00000133B35001C0>


In [11]:
voter_df = spark.read.format('csv').options(Header=True).load('data/DallasCouncilVoters.csv.gz')
voter_df.show()

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember|       Scott Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember| Rickey D. Callahan|
|01/11/2017|Councilmember|  Jennifer S. Gates|
|04/25/2018|C

In [12]:
voter_df.select(voter_df['VOTER_NAME']).distinct().show(10, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|VOTER_NAME                                                                                                                                                                                                                                                                                                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

- VOTER NAME의 길이가 0보다 크고, 20보다 작은 값을 가져온다. 
- 그런데, 여전히 011018__42와 같은 숫자가 존재하는 것을 볼 수 있다. 

In [13]:
# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
voter_df.select('VOTER_NAME').distinct().show(10, truncate=False)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|011018__42         |
|Mark  Clayton      |
+-------------------+
only showing top 10 rows



In [14]:
# 밑줄이 그어진 행은 제거하도록 한다. 
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))
voter_df.select('VOTER_NAME').distinct().show(10, truncate=False)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|Mark  Clayton      |
|Casey Thomas       |
+-------------------+
only showing top 10 rows



# Modifying DataFrame Columns

In [15]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()

+----------+-------------+-------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|
+----------+-------------+-------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|       Lee| Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|     Sandy|  Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|  



# when() example

In [18]:
import pyspark.sql.functions as F

# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

+----------+-------------+-------------------+----------+---------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|
+----------+-------------+-------------------+----------+---------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|  0.6613344991615496|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|  0.8831486414627607|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                null|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|  0.5965610424063875|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|  0.9205612437329433|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|  0.5514369352846538|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|  0.1372925714120783|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough|  0.5785345810580993|
|02/08/2017|Councilme

In [21]:
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

+----------+-------------+-------------------+----------+---------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|         random_val|
+----------+-------------+-------------------+----------+---------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates| 0.7072139201878489|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.3753947020828933|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                2.0|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|0.45078844143021557|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|0.07096838646638959|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold| 0.4289940203955551|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs| 0.2334369570540703|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough| 0.2779033617679124|
|02/08/2017|Councilmember|      

In [22]:
# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

+----------+--------------------+-----------------+----------+---------+----------+
|      DATE|               TITLE|       VOTER_NAME|first_name|last_name|random_val|
+----------+--------------------+-----------------+----------+---------+----------+
|04/25/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|04/25/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|08/15/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|08/15/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|09/18/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|    

# Using user defined functions in Spark

In [28]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df.show()

+----------+-------------+-------------------+----------+---------+-------------------+--------------------+---------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|         random_val|              splits|first_and_middle_name|
+----------+-------------+-------------------+----------+---------+-------------------+--------------------+---------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates| 0.7072139201878489|[Jennifer, S., Ga...|          Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.3753947020828933|[Philip, T., King...|            Philip T.|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                2.0|[Michael, S., Raw...|           Michael S.|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|0.45078844143021557|     [Adam, Medrano]|                 Adam|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|0.07096838646638

# Adding an ID field

In [31]:
df = spark.read.format('csv').options(Header=True).load('data/DallasCouncilVoters.csv.gz')
df.show()

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember|       Scott Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember| Rickey D. Callahan|
|01/11/2017|Councilmember|  Jennifer S. Gates|
|04/25/2018|C

In [32]:
# Select all the unique council voters
voter_df = df.select(df["VOTER_NAME"]).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)


There are 36 rows in the voter_df DataFrame.

+--------------------+------+
|          VOTER_NAME|ROW_ID|
+--------------------+------+
|        Lee Kleinman|    35|
|  the  final  201...|    34|
|         Erik Wilson|    33|
|  the  final   20...|    32|
| Carolyn King Arnold|    31|
| Rickey D.  Callahan|    30|
|   the   final  2...|    29|
|    Monica R. Alonzo|    28|
|     Lee M. Kleinman|    27|
|   Jennifer S. Gates|    26|
+--------------------+------+
only showing top 10 rows



In [33]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_2 = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df.select('ROW_ID').show()
voter_df_2.select('ROW_ID').show()

+------+
|ROW_ID|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows

+------+
|ROW_ID|
+------+
|    35|
|    36|
|    37|
|    38|
|    39|
|    40|
|    41|
|    42|
|    43|
|    44|
|    45|
|    46|
|    47|
|    48|
|    49|
|    50|
|    51|
|    52|
|    53|
|    54|
+------+
only showing top 20 rows

