In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('stads').getOrCreate()

24/06/21 17:37:55 WARN Utils: Your hostname, marwen resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlp3s0)
24/06/21 17:37:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/21 17:37:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

In [3]:
df = spark.read.csv('stads.csv', inferSchema= True, header= True, multiLine= True)

                                                                                

In [4]:
type(df)

pyspark.sql.dataframe.DataFrame

In [5]:
df

DataFrame[Stadium: string, Seating capacity: string, Region: string, Country: string, City: string, Images: string, Home team(s): string]

In [6]:
df.show()

+--------------------+----------------+----------------+----------------+--------------------+------+--------------------+
|             Stadium|Seating capacity|          Region|         Country|                City|Images|        Home team(s)|
+--------------------+----------------+----------------+----------------+--------------------+------+--------------------+
|Rungrado 1st of M...|      114,000[1]|       East Asia|     North Korea|         Pyongyang\n|  NULL|Korea DPR nationa...|
|    Michigan Stadium|      107,601[2]|   North America|   United States|Ann Arbor, Michig...|    \n|Michigan Wolverin...|
|        Ohio Stadium|      102,780[3]|   North America|   United States|      Columbus, Ohio|  NULL|Ohio State Buckey...|
|Melbourne Cricket...|      100,024[4]|         Oceania|       Australia| Melbourne, Victoria|  NULL|Australia nationa...|
|          Camp Nou ♦|       99,354[5]|          Europe|           Spain|Barcelona, Catalo...|  NULL|      FC Barcelona\n|
|    Estadio Azt

In [7]:
df.printSchema()

root
 |-- Stadium: string (nullable = true)
 |-- Seating capacity: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Images: string (nullable = true)
 |-- Home team(s): string (nullable = true)



In [8]:
# tranformation functions

def clean_text(text):
    
    import re
    
    # replace newline characters with an empty string
    if text:
        text = text.replace('\n', '')
    
    # replace the '♦' character with an empty string
    if text:
        text = text.replace('♦', '')
    
    # remove leading and trailing whitespace
    if text:
        text = re.sub(r'^\s+|\s+$', '', text)
    
    return text


def clean_integer(string):
    
    import re

    string = re.sub(r'\[.*?\]', '', string)
        
    if string:
        
        string = string.replace(',', '') 

    return string

In [9]:
# create udfs

from pyspark.sql.functions import udf

from pyspark.sql.types import StringType

clean_text_udf = udf(clean_text, StringType())

clean_integer_udf = udf(clean_integer, StringType())

In [10]:
# apply transformations to the df

# for all columns except Images, clean text
for column in df.columns:

    if column != 'Images':
    
        df = df.withColumn(column, clean_text_udf(df[column]))

# for Seating capacity column, apply integer specific transformations
df = df.withColumn('Seating capacity', clean_integer_udf(df['Seating capacity']))

df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+----------------+--------------+-------------+--------------------+------+--------------------+
|             Stadium|Seating capacity|        Region|      Country|                City|Images|        Home team(s)|
+--------------------+----------------+--------------+-------------+--------------------+------+--------------------+
|Rungrado 1st of M...|          114000|     East Asia|  North Korea|           Pyongyang|  NULL|Korea DPR nationa...|
|    Michigan Stadium|          107601| North America|United States| Ann Arbor, Michigan|    \n|Michigan Wolverin...|
|        Ohio Stadium|          102780| North America|United States|      Columbus, Ohio|  NULL|Ohio State Buckey...|
|Melbourne Cricket...|          100024|       Oceania|    Australia| Melbourne, Victoria|  NULL|Australia nationa...|
|            Camp Nou|           99354|        Europe|        Spain|Barcelona, Catalonia|  NULL|        FC Barcelona|
|      Estadio Azteca|           95500| North America|  

                                                                                

In [12]:
# convert Seating capacity to integer
from pyspark.sql.functions import col

df = df.withColumn("Seating capacity", col("Seating capacity").cast('integer'))

df.show()

+--------------------+----------------+--------------+-------------+--------------------+------+--------------------+
|             Stadium|Seating capacity|        Region|      Country|                City|Images|        Home team(s)|
+--------------------+----------------+--------------+-------------+--------------------+------+--------------------+
|Rungrado 1st of M...|          114000|     East Asia|  North Korea|           Pyongyang|  NULL|Korea DPR nationa...|
|    Michigan Stadium|          107601| North America|United States| Ann Arbor, Michigan|    \n|Michigan Wolverin...|
|        Ohio Stadium|          102780| North America|United States|      Columbus, Ohio|  NULL|Ohio State Buckey...|
|Melbourne Cricket...|          100024|       Oceania|    Australia| Melbourne, Victoria|  NULL|Australia nationa...|
|            Camp Nou|           99354|        Europe|        Spain|Barcelona, Catalonia|  NULL|        FC Barcelona|
|      Estadio Azteca|           95500| North America|  

In [13]:
df.printSchema()

root
 |-- Stadium: string (nullable = true)
 |-- Seating capacity: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Images: string (nullable = true)
 |-- Home team(s): string (nullable = true)



In [14]:
df.write.csv('output', header= True)

                                                                                