In [None]:
# load data

df = spark.read.csv(path="dbfs:/FileStore/coaster_db.csv",
                   header=True,
                   sep=",")
df.count()

Out[58]: 1087

## Step 1: Understand the Data
- Check the shape of the data
- Peek at sample data with df.head() or df.describe() to get stats about the numeric columns
- Get a list of columns with df.columns
- Check data types of columns with df.dtypes

In [None]:
df.describe()

Out[59]: DataFrame[summary: string, coaster_name: string, Length: string, Speed: string, Location: string, Status: string, Opening date: string, Type: string, Manufacturer: string, Height restriction: string, Model: string, Height: string, Inversions: string, Lift/launch system: string, Cost: string, Trains: string, Park section: string, Duration: string, Capacity: string, G-force: string, Designer: string, Max vertical angle: string, Drop: string, Soft opening date: string, Fast Lane available: string, Replaced: string, Track layout: string, Fastrack available: string, Soft opening date.1: string, Closing date: string, Opened: string, Replaced by: string, Website: string, Flash Pass Available32: string, Must transfer from wheelchair: string, Theme: string, Single rider line available: string, Restraint Style: string, Flash Pass available37: string, Acceleration: string, Restraints: string, Name: string, year_introduced: string, latitude: string, longitude: string, Type_Main: string, o

In [None]:
df.columns

Out[60]: ['coaster_name',
 'Length',
 'Speed',
 'Location',
 'Status',
 'Opening date',
 'Type',
 'Manufacturer',
 'Height restriction',
 'Model',
 'Height',
 'Inversions',
 'Lift/launch system',
 'Cost',
 'Trains',
 'Park section',
 'Duration',
 'Capacity',
 'G-force',
 'Designer',
 'Max vertical angle',
 'Drop',
 'Soft opening date',
 'Fast Lane available',
 'Replaced',
 'Track layout',
 'Fastrack available',
 'Soft opening date.1',
 'Closing date',
 'Opened',
 'Replaced by',
 'Website',
 'Flash Pass Available32',
 'Must transfer from wheelchair',
 'Theme',
 'Single rider line available',
 'Restraint Style',
 'Flash Pass available37',
 'Acceleration',
 'Restraints',
 'Name',
 'year_introduced',
 'latitude',
 'longitude',
 'Type_Main',
 'opening_date_clean',
 'speed1',
 'speed2',
 'speed1_value',
 'speed1_unit',
 'speed_mph',
 'height_value',
 'height_unit',
 'height_ft',
 'Inversions_clean',
 'Gforce_clean']

In [None]:
df.printSchema()

root
 |-- coaster_name: string (nullable = true)
 |-- Length: string (nullable = true)
 |-- Speed: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Opening date: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Manufacturer: string (nullable = true)
 |-- Height restriction: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Inversions: string (nullable = true)
 |-- Lift/launch system: string (nullable = true)
 |-- Cost: string (nullable = true)
 |-- Trains: string (nullable = true)
 |-- Park section: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Capacity: string (nullable = true)
 |-- G-force: string (nullable = true)
 |-- Designer: string (nullable = true)
 |-- Max vertical angle: string (nullable = true)
 |-- Drop: string (nullable = true)
 |-- Soft opening date: string (nullable = true)
 |-- Fast Lane available: string (nullable = 

In [None]:
df.dtypes

Out[62]: [('coaster_name', 'string'),
 ('Length', 'string'),
 ('Speed', 'string'),
 ('Location', 'string'),
 ('Status', 'string'),
 ('Opening date', 'string'),
 ('Type', 'string'),
 ('Manufacturer', 'string'),
 ('Height restriction', 'string'),
 ('Model', 'string'),
 ('Height', 'string'),
 ('Inversions', 'string'),
 ('Lift/launch system', 'string'),
 ('Cost', 'string'),
 ('Trains', 'string'),
 ('Park section', 'string'),
 ('Duration', 'string'),
 ('Capacity', 'string'),
 ('G-force', 'string'),
 ('Designer', 'string'),
 ('Max vertical angle', 'string'),
 ('Drop', 'string'),
 ('Soft opening date', 'string'),
 ('Fast Lane available', 'string'),
 ('Replaced', 'string'),
 ('Track layout', 'string'),
 ('Fastrack available', 'string'),
 ('Soft opening date.1', 'string'),
 ('Closing date', 'string'),
 ('Opened', 'string'),
 ('Replaced by', 'string'),
 ('Website', 'string'),
 ('Flash Pass Available32', 'string'),
 ('Must transfer from wheelchair', 'string'),
 ('Theme', 'string'),
 ('Single ride

## Step 2: Prepare the Data
- Subset the dataframe by dropping irrelevant columns and rows
- Identify duplicate columns
- Rename columns
- Create feature

In [None]:
# sample column drop
df.drop(df.Length)

Out[63]: DataFrame[coaster_name: string, Speed: string, Location: string, Status: string, Opening date: string, Type: string, Manufacturer: string, Height restriction: string, Model: string, Height: string, Inversions: string, Lift/launch system: string, Cost: string, Trains: string, Park section: string, Duration: string, Capacity: string, G-force: string, Designer: string, Max vertical angle: string, Drop: string, Soft opening date: string, Fast Lane available: string, Replaced: string, Track layout: string, Fastrack available: string, Soft opening date.1: string, Closing date: string, Opened: string, Replaced by: string, Website: string, Flash Pass Available32: string, Must transfer from wheelchair: string, Theme: string, Single rider line available: string, Restraint Style: string, Flash Pass available37: string, Acceleration: string, Restraints: string, Name: string, year_introduced: string, latitude: string, longitude: string, Type_Main: string, opening_date_clean: string, speed1

In [None]:
df.columns

Out[64]: ['coaster_name',
 'Length',
 'Speed',
 'Location',
 'Status',
 'Opening date',
 'Type',
 'Manufacturer',
 'Height restriction',
 'Model',
 'Height',
 'Inversions',
 'Lift/launch system',
 'Cost',
 'Trains',
 'Park section',
 'Duration',
 'Capacity',
 'G-force',
 'Designer',
 'Max vertical angle',
 'Drop',
 'Soft opening date',
 'Fast Lane available',
 'Replaced',
 'Track layout',
 'Fastrack available',
 'Soft opening date.1',
 'Closing date',
 'Opened',
 'Replaced by',
 'Website',
 'Flash Pass Available32',
 'Must transfer from wheelchair',
 'Theme',
 'Single rider line available',
 'Restraint Style',
 'Flash Pass available37',
 'Acceleration',
 'Restraints',
 'Name',
 'year_introduced',
 'latitude',
 'longitude',
 'Type_Main',
 'opening_date_clean',
 'speed1',
 'speed2',
 'speed1_value',
 'speed1_unit',
 'speed_mph',
 'height_value',
 'height_unit',
 'height_ft',
 'Inversions_clean',
 'Gforce_clean']

In [None]:
df.drop(
# 'coaster_name',
 'Length',
 'Speed',
 # 'Location',
 # 'Status',
 'Opening date',
 'Type',
 # 'Manufacturer',
 'Height restriction',
 'Model',
 'Height',
 'Inversions',
 'Lift/launch system',
 'Cost',
 'Trains',
 'Park section',
 'Duration',
 'Capacity',
 'G-force',
 'Designer',
 'Max vertical angle',
 'Drop',
 'Soft opening date',
 'Fast Lane available',
 'Replaced',
 'Track layout',
 'Fastrack available',
 'Soft opening date.1',
 'Closing date',
 'Opened',
 'Replaced by',
 'Website',
 'Flash Pass Available32',
 'Must transfer from wheelchair',
 'Theme',
 'Single rider line available',
 'Restraint Style',
 'Flash Pass available37',
 'Acceleration',
 'Restraints',
 'Name',
 # 'year_introduced',
 # 'latitude',
 # 'longitude',
 # 'Type_Main',
 # 'opening_date_clean',
 'speed1',
 'speed2',
 'speed1_value',
 'speed1_unit',
 # 'speed_mph',
 'height_value',
 'height_unit',
 # 'height_ft',
 # 'Inversions_clean',
 # 'Gforce_clean'
).columns

Out[65]: ['coaster_name',
 'Location',
 'Status',
 'Manufacturer',
 'year_introduced',
 'latitude',
 'longitude',
 'Type_Main',
 'opening_date_clean',
 'speed_mph',
 'height_ft',
 'Inversions_clean',
 'Gforce_clean']

In [None]:
# make the changes permenant by overwriting the existing df
df = df.drop(
# 'coaster_name',
 'Length',
 'Speed',
 # 'Location',
 # 'Status',
 'Opening date',
 'Type',
 # 'Manufacturer',
 'Height restriction',
 'Model',
 'Height',
 'Inversions',
 'Lift/launch system',
 'Cost',
 'Trains',
 'Park section',
 'Duration',
 'Capacity',
 'G-force',
 'Designer',
 'Max vertical angle',
 'Drop',
 'Soft opening date',
 'Fast Lane available',
 'Replaced',
 'Track layout',
 'Fastrack available',
 'Soft opening date.1',
 'Closing date',
 'Opened',
 'Replaced by',
 'Website',
 'Flash Pass Available32',
 'Must transfer from wheelchair',
 'Theme',
 'Single rider line available',
 'Restraint Style',
 'Flash Pass available37',
 'Acceleration',
 'Restraints',
 'Name',
 # 'year_introduced',
 # 'latitude',
 # 'longitude',
 # 'Type_Main',
 # 'opening_date_clean',
 'speed1',
 'speed2',
 'speed1_value',
 'speed1_unit',
 # 'speed_mph',
 'height_value',
 'height_unit',
 # 'height_ft',
 # 'Inversions_clean',
 # 'Gforce_clean'
 )


In [None]:
df.printSchema()

root
 |-- coaster_name: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Manufacturer: string (nullable = true)
 |-- year_introduced: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- Type_Main: string (nullable = true)
 |-- opening_date_clean: string (nullable = true)
 |-- speed_mph: string (nullable = true)
 |-- height_ft: string (nullable = true)
 |-- Inversions_clean: string (nullable = true)
 |-- Gforce_clean: string (nullable = true)



In [None]:
# check the first few rows to determine which columns need to be type casted
df.show()

+--------------------+--------------------+---------+--------------------+---------------+--------+---------+---------+------------------+---------+---------+----------------+------------+
|        coaster_name|            Location|   Status|        Manufacturer|year_introduced|latitude|longitude|Type_Main|opening_date_clean|speed_mph|height_ft|Inversions_clean|Gforce_clean|
+--------------------+--------------------+---------+--------------------+---------------+--------+---------+---------+------------------+---------+---------+----------------+------------+
|  Switchback Railway|        Coney Island|  Removed|LaMarcus Adna Tho...|           1884|  40.574|  -73.978|     Wood|        1884-06-16|      6.0|     null|               0|         2.9|
|   Flip Flap Railway|       Sea Lion Park|  Removed|        Lina Beecher|           1895|  40.578|  -73.979|     Wood|        1895-01-01|     null|     null|               1|        12.0|
|Switchback Railwa...|Cleveland, Ohio, ...|   Closed|  

In [None]:
# import col 
from pyspark.sql.functions import col 

# convert the float columns to float type
cols = [ 'latitude', 'longitude', 'speed_mph', 'height_ft', 'Gforce_clean']

for col_name in cols:
    df = df.withColumn(col_name, col(col_name).cast('float'))

In [None]:
# check column type
df.printSchema()

root
 |-- coaster_name: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Manufacturer: string (nullable = true)
 |-- year_introduced: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- Type_Main: string (nullable = true)
 |-- opening_date_clean: string (nullable = true)
 |-- speed_mph: float (nullable = true)
 |-- height_ft: float (nullable = true)
 |-- Inversions_clean: string (nullable = true)
 |-- Gforce_clean: float (nullable = true)



In [None]:
# expand the logic to include other columns that need to be type casted

col_dict ={
    "float_cols": [ 'latitude', 'longitude', 'speed_mph', 'height_ft', 'Gforce_clean'],
    "int_cols": ['Inversions_clean'],
    "date_cols": ['year_introduced']
}

for k,v in col_dict.items():
    if k == "float_cols":
        for col_name in col_dict["float_cols"]:
            df = df.withColumn(col_name, col(col_name).cast('float'))
    elif k == "int_cols":
        for col_name in col_dict["int_cols"]:
            df = df.withColumn(col_name, col(col_name).cast('int'))
    elif k == "date_cols":
        for col_name in col_dict["date_cols"]:
            df = df.withColumn(col_name, col(col_name).cast('date'))



In [None]:
df.printSchema()

root
 |-- coaster_name: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Manufacturer: string (nullable = true)
 |-- year_introduced: date (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- Type_Main: string (nullable = true)
 |-- opening_date_clean: string (nullable = true)
 |-- speed_mph: float (nullable = true)
 |-- height_ft: float (nullable = true)
 |-- Inversions_clean: integer (nullable = true)
 |-- Gforce_clean: float (nullable = true)



In [None]:
# create the logic for Pacal Casing column names and removing words such as clean or Clean
s = 'year_introduced'
s = s.split('_')
s = ''.join(word.title() for word in s if word not in ['clean', 'Clean'])
s

Out[75]: 'YearIntroduced'

In [None]:
# apply the renaming logic

def rename_columns(col_name):
    col_name = col_name.split("_")
    col_name = "".join(word.title() for word in col_name if word not in ['clean', 'Clean'])
    return col_name

col_names = [rename_columns(name) for name in df.columns]
col_names

Out[77]: ['CoasterName',
 'Location',
 'Status',
 'Manufacturer',
 'YearIntroduced',
 'Latitude',
 'Longitude',
 'TypeMain',
 'OpeningDate',
 'SpeedMph',
 'HeightFt',
 'Inversions',
 'Gforce']

In [None]:
# rename the columns by overwriting the existing df 
df = df.toDF(*col_names)


df.printSchema()

root
 |-- CoasterName: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Manufacturer: string (nullable = true)
 |-- YearIntroduced: date (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- TypeMain: string (nullable = true)
 |-- OpeningDate: string (nullable = true)
 |-- SpeedMph: float (nullable = true)
 |-- HeightFt: float (nullable = true)
 |-- Inversions: integer (nullable = true)
 |-- Gforce: float (nullable = true)



### Task: Identify Missing Values

In [None]:
# using built-in isNull() method

null_dict = {col:df.filter(df[col].isNull()).count() for col in df.columns}
null_dict


Out[87]: {'CoasterName': 0,
 'Location': 0,
 'Status': 213,
 'Manufacturer': 59,
 'YearIntroduced': 0,
 'Latitude': 275,
 'Longitude': 275,
 'TypeMain': 0,
 'OpeningDate': 250,
 'SpeedMph': 150,
 'HeightFt': 916,
 'Inversions': 0,
 'Gforce': 725}

In [None]:
# using select and where
df.select('Manufacturer').where('Manufacturer IS NULL').count()

Out[97]: 59

In [None]:
# apply it to the entire df

null_sql_dict = dict()

for col_name in df.columns:
    null_count = df.select(col_name).where(f"{col_name} IS NULL").count()
    null_sql_dict[col_name] = null_count

null_sql_dict

Out[115]: {'CoasterName': 0,
 'Location': 0,
 'Status': 213,
 'Manufacturer': 59,
 'YearIntroduced': 0,
 'Latitude': 275,
 'Longitude': 275,
 'TypeMain': 0,
 'OpeningDate': 250,
 'SpeedMph': 150,
 'HeightFt': 916,
 'Inversions': 0,
 'Gforce': 725}

### Task: Identify if there is any duplicated rows

In [None]:
df.count() == df.dropDuplicates().count()

Out[114]: True

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

df.withColumn("id",monotonically_increasing_id()).where("id == 20").show()

+--------------------+-----------------+------+----------------+--------------+--------+---------+--------+-----------+--------+--------+----------+------+---+
|         CoasterName|         Location|Status|    Manufacturer|YearIntroduced|Latitude|Longitude|TypeMain|OpeningDate|SpeedMph|HeightFt|Inversions|Gforce| id|
+--------------------+-----------------+------+----------------+--------------+--------+---------+--------+-----------+--------+--------+----------+------+---+
|The Wild One (rol...|Six Flags America|  null|Dinn Corporation|    1917-01-01| 38.9092| -76.7725|    Wood|       null|    53.0|    null|         0|  null| 20|
+--------------------+-----------------+------+----------------+--------------+--------+---------+--------+-----------+--------+--------+----------+------+---+



In [None]:
# drop duplicate rows from a subset of columns
# -> CoasterName, Location, OpeningDate
df = df.dropDuplicates(subset=['CoasterName', 'Location', 'OpeningDate'])


df.count()



Out[138]: 990

## Step 3: Data Visualization

### Task: Visualize the top 10 years coasters were introduced

In [None]:
df.groupBy('YearIntroduced').count().show()

+--------------+-----+
|YearIntroduced|count|
+--------------+-----+
|    1982-01-01|    7|
|    2009-01-01|   21|
|    2016-01-01|   17|
|    1933-01-01|    1|
|    1975-01-01|    7|
|    1910-01-01|    1|
|    1979-01-01|    9|
|    1967-01-01|    2|
|    1998-01-01|   30|
|    1929-01-01|    1|
|    2008-01-01|   27|
|    2011-01-01|   24|
|    1981-01-01|   11|
|    1976-01-01|   16|
|    1969-01-01|    3|
|    1927-01-01|    6|
|    1934-01-01|    1|
|    2017-01-01|   17|
|    1912-01-01|    2|
|    1935-01-01|    2|
+--------------+-----+
only showing top 20 rows



In [None]:
result = df.groupBy('YearIntroduced').count().sort('count', ascending=False).show(10)
display(result)

+--------------+-----+
|YearIntroduced|count|
+--------------+-----+
|    1999-01-01|   46|
|    2000-01-01|   45|
|    1998-01-01|   30|
|    2001-01-01|   29|
|    2002-01-01|   28|
|    2008-01-01|   27|
|    2004-01-01|   25|
|    2011-01-01|   24|
|    2007-01-01|   24|
|    2006-01-01|   23|
+--------------+-----+
only showing top 10 rows



In [None]:
# save df as Hive Table
df.write.mode('overwrite').saveAsTable('coasters')

spark.sql('select count(*) from coasters').show()

+--------+
|count(1)|
+--------+
|     990|
+--------+



In [None]:
spark.sql('DESCRIBE coasters').show()

+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|   CoasterName|   string|   null|
|      Location|   string|   null|
|        Status|   string|   null|
|  Manufacturer|   string|   null|
|YearIntroduced|     date|   null|
|      Latitude|    float|   null|
|     Longitude|    float|   null|
|      TypeMain|   string|   null|
|   OpeningDate|   string|   null|
|      SpeedMph|    float|   null|
|      HeightFt|    float|   null|
|    Inversions|      int|   null|
|        Gforce|    float|   null|
+--------------+---------+-------+



In [None]:
spark.sql('SELECT YearIntroduced, COUNT(YearIntroduced) \
            FROM coasters \
            GROUP BY YearIntroduced \
            SORT BY COUNT(YearIntroduced) DESC  \
            ').show(10)

+--------------+---------------------+
|YearIntroduced|count(YearIntroduced)|
+--------------+---------------------+
|    1999-01-01|                   46|
|    2000-01-01|                   45|
|    1998-01-01|                   30|
|    2001-01-01|                   29|
|    2002-01-01|                   28|
|    2008-01-01|                   27|
|    2004-01-01|                   25|
|    2011-01-01|                   24|
|    2007-01-01|                   24|
|    2006-01-01|                   23|
+--------------+---------------------+
only showing top 10 rows



In [None]:
query = '''SELECT YearIntroduced, COUNT(YearIntroduced)
        FROM coasters
        GROUP BY YearIntroduced
        ORDER BY COUNT(YearIntroduced) DESC 
        LIMIT 10
        '''
output = spark.sql(query)
display(output)

YearIntroduced,count(YearIntroduced)
1999-01-01,46
2000-01-01,45
1998-01-01,30
2001-01-01,29
2002-01-01,28
2008-01-01,27
2004-01-01,25
2011-01-01,24
2007-01-01,24
2012-01-01,23


Output can only be rendered in Databricks

Output can only be rendered in Databricks

### Task: Plot the distribution of Coaster Speed

In [None]:
query = """ SELECT SpeedMph, COUNT(SpeedMph) AS CoasterCount
            FROM coasters
            GROUP BY SpeedMph
            ORDER BY COUNT(SpeedMph) DESC
        """
output = spark.sql(query)
display(output)

SpeedMph,CoasterCount
50.0,54
55.0,38
40.0,36
45.0,31
49.7,28
35.0,28
60.0,21
55.9,20
47.0,19
70.0,16


Output can only be rendered in Databricks

### Task: Plot the distribution of Coasters based on coaster material - TypeMain

In [None]:
query =""" SELECT TypeMain, COUNT(TypeMain)
        FROM coasters
        GROUP BY TypeMain
        """

output = spark.sql(query)
display(output)

TypeMain,count(TypeMain)
Steel,728
Wood,191
Other,71


Output can only be rendered in Databricks

### Task: Display the relationship between Coaster Speed and Coaster Height

In [None]:
query = """ SELECT SpeedMph, HeightFt
            FROM coasters
        """

output = spark.sql(query)
display(output)

SpeedMph,HeightFt
45.0,
52.8,98.4
62.1,
37.3,65.6
55.9,
45.0,
35.0,
,
62.0,
45.0,


Output can only be rendered in Databricks

### Task: List the locations with the fastest roller coasters AND minimum 10 coaster in operation

In [None]:
df.columns

Out[190]: ['CoasterName',
 'Location',
 'Status',
 'Manufacturer',
 'YearIntroduced',
 'Latitude',
 'Longitude',
 'TypeMain',
 'OpeningDate',
 'SpeedMph',
 'HeightFt',
 'Inversions',
 'Gforce']

In [None]:
# get a value count for location
df.select('Location').groupBy('Location').count().show()

+--------------------+-----+
|            Location|count|
+--------------------+-----+
|       Worlds of Fun|   10|
|Nickelodeon Universe|    7|
|      Battersea Park|    1|
|Wonderland Park (...|    1|
|    Fårup Sommerland|    1|
|           Gardaland|    6|
|Sesquicentennial ...|    1|
|Silverwood Theme ...|    3|
|            Efteling|    7|
|     Tokyo DisneySea|    1|
|  Movie Park Germany|    7|
|Primm Valley Resorts|    1|
|          Story Land|    1|
|      Gyeongju World|    1|
|Six Flags Discove...|    5|
|      Tivoli Gardens|    1|
|        Parc Astérix|    4|
|              Lagoon|    3|
|         Thorpe Park|    6|
|      Riverview Park|    2|
+--------------------+-----+
only showing top 20 rows



In [None]:
# sort the results in ascending order
df.select('Location').groupBy('Location').count().sort('count',ascending=False).show()

+--------------------+-----+
|            Location|count|
+--------------------+-----+
|               Other|  181|
|        Kings Island|   19|
|         Cedar Point|   18|
|Six Flags Magic M...|   17|
|         Hersheypark|   16|
|Six Flags Great A...|   14|
|           Carowinds|   14|
| Canada's Wonderland|   13|
|        Alton Towers|   13|
|      Kings Dominion|   12|
|Busch Gardens Wil...|   12|
|Blackpool Pleasur...|   11|
|       Worlds of Fun|   10|
|  Knott's Berry Farm|    9|
|           Kennywood|    9|
|       Morey's Piers|    9|
|Busch Gardens Tam...|    8|
|          Valleyfair|    8|
|          Dreamworld|    8|
|Six Flags Over Ge...|    8|
+--------------------+-----+
only showing top 20 rows



In [None]:
# exclude 'Other' location
df.filter('Location != "Other"')\
.select('Location')\
.groupby('Location')\
.count()\
.sort('count', ascending=False)\
.show()

+--------------------+-----+
|            Location|count|
+--------------------+-----+
|        Kings Island|   19|
|         Cedar Point|   18|
|Six Flags Magic M...|   17|
|         Hersheypark|   16|
|Six Flags Great A...|   14|
|           Carowinds|   14|
| Canada's Wonderland|   13|
|        Alton Towers|   13|
|Busch Gardens Wil...|   12|
|      Kings Dominion|   12|
|Blackpool Pleasur...|   11|
|       Worlds of Fun|   10|
|           Kennywood|    9|
|  Knott's Berry Farm|    9|
|       Morey's Piers|    9|
|Busch Gardens Tam...|    8|
|          Valleyfair|    8|
|          Dreamworld|    8|
|Six Flags Over Ge...|    8|
|          Linnanmäki|    8|
+--------------------+-----+
only showing top 20 rows



In [None]:
query = """SELECT Location, COUNT(Location)
        FROM coasters
        WHERE Location <> 'Other'
        AND SpeedMph IN (SELECT SpeedMph FROM coasters ORDER BY SpeedMph DESC)
        GROUP BY Location
        HAVING COUNT(Location) > 10 
        ORDER BY `COUNT(Location)` DESC
        """

output = spark.sql(query)
display(output)

Location,count(Location)
Kings Island,19
Cedar Point,18
Six Flags Magic Mountain,17
Carowinds,14
Hersheypark,13
Canada's Wonderland,12
Kings Dominion,12
Alton Towers,12
Busch Gardens Williamsburg,11
Six Flags Great Adventure,11


Output can only be rendered in Databricks

In [None]:
query = """SELECT YearIntroduced
        FROM coasters
        GROUP BY YearIntroduced
        ORDER BY COUNT(YearIntroduced) DESC
        LIMIT 3
        """

output = spark.sql(query)
display(output)

YearIntroduced
1999-01-01
2000-01-01
1998-01-01


### TASK: Display the top 3 Manufacturers and Coaster Count built by the Manufacturer in descending order based on the number of coasters they built during the busiest top 3 expansion years.

In [None]:
%sql

SELECT Manufacturer, COUNT(Manufacturer) `Coaster Count`
FROM coasters
WHERE YearIntroduced IN (
  SELECT YearIntroduced
  FROM coasters
  GROUP BY YearIntroduced
  ORDER BY COUNT(YearIntroduced) DESC
  LIMIT 3
  )
GROUP BY Manufacturer
ORDER BY COUNT(Manufacturer) DESC 
LIMIT 3


Manufacturer,Coaster Count
Vekoma,24
Bolliger & Mabillard,16
Custom Coasters International,12


Output can only be rendered in Databricks