<a href="https://colab.research.google.com/github/OliverRevilla/Spark_Pyspark/blob/main/Cleaning_data_with_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Introduction to data clenaning with Apache Spark

In [None]:
# Schema
"""
import pyspark.sql.types
peopleSchema = StructType([
    # Define the name field
    StructField('name', StringType(),True),
    # Add the age field                       
    StructField('age',IntegerType(),True),
    # Add the city field
    StructField('city', StringType(),True)
])

# Read CSV file containing data
people_df = spark.read.format('csv').load(name = 'rawdata.csv', schema = peopleSchema)
"""
# Import the pyspark.sql.types library
"""
from pyspark.sql.types import *

# Define a new schema using the StructType method 
people_schema = StructType([
  # Define a StructField for each fiekld
  StructField('Name', StringType(), True),
  StructField('Age',IntegerType(),True),
  StructField('City',StringType(),True)
])
"""
"""
# Using lazy processing
voter_df = spark.read.csv('votedata.csv')
voter_df = voter_df.withColumn('fullyear', voter_df.year + 2000)
voter_df = voter_df.drop(voter_df.year)

# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show(3)
"""
# Understanding Parquet

# Reading parquet files
"""
df = spark.read.format('parquet').load('filename.parquet')
df = spark.read.parquet('filename.parquet')
"""
# Writing parquet files
"""
df.write.format('parquet').save('filename.parquet')
df.write.parquet('filename.parquet')

# View the row count of df1 and df2
print('df1 Count: %d' % df1.count())
print('df2 Count: %d' % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())
"""
# Parquet and SQL
"""
df = spark.read.parquet('df.parquet')
df.createOrReplaceTempView('flights')
short_flights_df = spark.sql('SELECT * FROM flights WHERE flightduration < 100')

# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')

# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)
"""


Manipulating DataFrames in the real world

In [None]:
# DataFrame column operations

# Return rows where name starts with "M"
"""
volter_df.filter(volder_df.name.like('%M'))

# Return name and position only
voters = voter_df.select('name','position')

# Common dataFrame transformations
# Filter/where
voter_df.filter(voter_df.date > '1/1/2019')
voter_df.where(voter_df.date > '1/1/2019')
# Select
voter_df.select(voter_df.name)
# withColumn
voter_df.withColumn('year', voter_df.date.year)
# drop
voter_df.drop('unused_column')

### Filtering data
df.filter(df['name'].isNotNull())
df.filter(df['date'] > 1000)
df.where(df['columna'].contains('criteria'))
df.where(df.columna.isNull())
"""
### Column string transformations
"""
import pyspark.sql.functions as F
# Applied per column as transformation
df.withColumn('upper', F.upper('name')) # withColumn('NewColumns', F.function('OldColumn'))
# Can create intermediary columns
voter_df.withColumn('splits', F.split('name', ''))
# Can cast to other types
df.withColumn('year',df['c4'].cast(IntegerType()))
"""
"""
# ArrayType() column functions
.size() # Returns length of arrayType() column
.getItem() # Used to retrieve a specific item at index of list column

import pyspark.sql.funtions as F

# Show the distinct VOTER_NAME entries
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate = False)
"""
"""
#Exercise
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()
"""
# Conditional DataFrame column operations
# when("condicion", then...)
"""
""Individual when()
df.select(df.Name, df.Age, F.when(df.Age >= 18, "Adult"))
"" Multiple when()
df.select(df.Name, df.Age,
          .when(df.Age >= 18, "Adult)
          .when(df.Age < 18, "Minor"))

df.select(df.Name, df.Age,
          .when(df.Age >= 18, "Adult)
          .otherwise("Minor"))

"""
"""
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()
"""
"""
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE ==  'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

"""
# User defined functions
#pyspark.sql.functions.udf()

"""
def reverseString(mystr):
  return mystr(::-1)

udfReverseString = F.udf(reverseString, StringType()) # Sintaxis (function that we defined, data type)

# Use with Spark
user_df = user_df.withColumn('ReverseName', udfReverseString(user_df.Name))
"""
"""
def sortingCap():
  return random.choice(['G','H','R','S'])

udfSortingCap = udf(sortingCap, StringType())
user_df = user_df.withColumn('Class', udfSortingCap())
"""
"""
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names)

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle,StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df.show()
"""
## IDs
"""
# Select all the unique council voters
voter_df = df.select(df['VOTER NAME']).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID',F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
"""
"""
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID',F.monotonically_increasing_id() )

voter_df_single = voter_df_single.withColumn('ROW_ID',F.monotonically_increasing_id() )

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)

"""
"""
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', previous_max_ID + F.monotonically_increasing_id())

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()
"""

Caching

In [None]:
# Implementiing caching
"""df = spark.read_csv('voter_data.txt.gz')
df.cache().count()

df = df.withColumn('ID', monotically_incrasing_id())
df = df.cache()
df.show()

#  check if a dataframe is cached
print(voter_df.is_cached)

# finish cached
.unpersist()

start_time = time.time()

"""
# Add caching to the unique rows in departures_df
"""
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))
"""
"""
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)
"""
# Optimize
"""
df_csv = spark.read.csv('df.csv')
df_csv.write.parquet('data.parquet')
df = spark.read.parquet('data.parquet')
"""
"""
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_000.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))
"""
# Cluster sizing types
"""
spark.conf.get()
spark.conf.set().
"""
"""
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)
"""
"""
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())
"""
# Performance improvements
"""
voter_df = df.select(df['VOTER NAME']).distinct()
voter_df.explain()
# Shuffling refers to moving data around to varios workers to complete a task
# How to limit shuffling?
.repartition(num_partitions)
.coalesce(num_partitions)
.join()
.broadcast(): Provides a copy of an object to each worker

from pyspark.sql.functions import broadcast
combined_df = df_1.join(broadcast(df_2)) --- is adviceable for small dataframes or F.broadcast
"""
"""
# Join the flights_df and airports_df DataFrames
normal_df = flights_df.join(airports_df,\
            flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan
normal_df.explain()
"""
"""
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()
"""

'\n# Import the broadcast method from pyspark.sql.functions\nfrom pyspark.sql.functions import broadcast\n\n# Join the flights_df and airports_df DataFrames using broadcasting\nbroadcast_df = flights_df.join(broadcast(airports_df),     flights_df["Destination Airport"] == airports_df["IATA"] )\n\n# Show the query plan and compare against the original\nbroadcast_df.explain()\n'

Quick pipeline

In [None]:
"""
# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')
"""
#change the type of data
"""
departures_df = departures_df.withColumn('Duration', departures_df['Duration'].cast(IntegerType()))
"""

# Removing blank lines, headers, and comments
# Removing comments
"""
df1 = spark.read.csv('datafile.csv.gz', comment = '#')
"""
# Automatic column creation
"""
df1 = spark.read.csv('df.csv.gz', sep = ',')
df1 = spark.read.csv('df.csv.gz', sep = '*')
"""
"""
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))
"""
"""
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (F.col('colcount') < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))
"""
"""
# Split the content of _c0 on the tab character (aka, '\t)
split_cols = F.split(annotations_df['_c0'],'\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width',split_cols.getItem(2))
split_df = split_df.withColumn('height',split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)
"""
"""
def retriever(cols,colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever,ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df['split_cols'],split_df['colcount']))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')
"""
# Validation

#Validating via joins
parsed_df = spark.read.parquet('parsed_data.parquet')
company_df = spark.read.parquet('companies.parquet')
verfied_df = parsed_df.join(company_df, parsed_df.company == company_df.company) # thi is an inner join

"""
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0','folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(valid_folders_df, "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))
"""
"""
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)
"""
# Final Analysis and delivery
"""
def getAvgSale(saleslist):
  totalsales = 0
  count = 0
  for sale in saleslist:
    totalsales += sale[2] + sale[3] 
    count += 2
  return totalsales/count
udfGetAvgSale = udf(getAvgSale, DoubleType())
df =  df.withColumn('avg_sale',udfGetAvgSale(df.sales_list))
"""
"""
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
    StructField('breed',StringType(),False),
    StructField('start_x',IntegerType(),False),
    StructField('start_y',IntegerType(),False),
    StructField('end_x',IntegerType(),False),
    StructField('end_y',IntegerType(),False)
])
"""
"""
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x),int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)
"""
"""
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
  totalpixels = 0
  for dog in doglist:
    totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)
"""
