<a href="https://colab.research.google.com/github/SophiaHe/Datacamp_PySpark/blob/master/Big_Data_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Course 1: Introduction to PySpark**

See what tables are in your cluster by calling spark.catalog.listTables()

query = "FROM flights SELECT * LIMIT 10"

Get the first 10 rows of flights: 
flights10 = spark.sql(query)

Show the results:
flights10.show()

Convert the results to a pandas DataFrame:
pd_counts = flight_counts.toPandas()

Create pd_temp \\
pd_temp = pd.DataFrame(np.random.random(10))

Create spark_temp from pd_temp \\
spark_temp = spark.createDataFrame(pd_temp)

Add spark_temp to the catalog \\
spark_temp.createOrReplaceTempView("temp")

Examine the tables in the catalog again \\
print(spark.catalog.listTables())

file_path = "/usr/local/share/datasets/airports.csv"

Read in the airports data \\
airports = spark.read.csv(file_path, header = True)

Create the DataFrame flights \\
flights = spark.table("flights")

Show the head \\
flights.show()




**Course 2: Manipulating Data**

Add duration_hrs \\
flights = flights.withColumn('duration_hrs',flights.air_time/60)

Filter flights by passing a string \\
long_flights1 = flights.filter("distance > 1000")

Filter flights by passing a column of boolean values \\
long_flights2 = flights.filter(flights.distance > 1000)

Select the first set of columns \\
selected1 = flights.select("tailnum","origin","dest")

Select the second set of columns \\
temp = flights.select(flights.origin, flights.dest, flights.carrier)

Define avg_speed \\
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

Create the same table using a SQL expression \\
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

Find the shortest flight from PDX in terms of distance \\
flights.filter(flights.origin == 'PDX').groupBy().min("distance").show()

Average duration of Delta flights \\
flights.filter(flights.carrier == "DL").filter(flights.origin == 'SEA').groupBy().avg("air_time").show()

Total hours in the air \\
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()

Import pyspark.sql.functions as F \\
import pyspark.sql.functions as F

Standard deviation of departure delay \\
by_month_dest.agg(F.stddev('dep_delay')).show()

Rename the faa column \\
airports = airports.withColumnRenamed('faa','dest')

Join the DataFrames \\
flights_with_airports = flights.join(airports,on = 'dest', how = 'leftouter')

**Course 3: Machine Learning Pipeline**

At the core of the pyspark.ml module are the Transformer and Estimator classes. Almost every other class in the module behaves similarly to these two basic classes.

Transformer classes have a .transform() method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class Bucketizer to create discrete bins from a continuous feature or the class PCA to reduce the dimensionality of your dataset using principal component analysis.

Estimator classes all implement a .fit() method. These methods also take a DataFrame, but instead of returning another DataFrame they return a model object. This can be something like a StringIndexerModel for including categorical data saved as strings in your models, or a RandomForestModel that uses the random forest algorithm for classification or regression.

#### Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals

It's important to note that .cast() works on columns, while .withColumn() works on DataFrames. The only argument you need to pass to .cast() is the kind of value you want to create, in string form. For example, to create integers, you'll pass the argument "integer" and for decimal numbers you'll use "double".

dataframe = dataframe.withColumn("col", dataframe.col.cast("new_type")) \\
model_data = model_data.withColumn("month", model_data.month.cast('integer'))

Convert to an integer \\
model_data = model_data.withColumn("label", model_data.is_late.cast('integer'))

Create a StringIndexer \\
carr_indexer = StringIndexer(inputCol= 'carrier',outputCol='carrier_index')

Create a OneHotEncoder \\
carr_encoder = OneHotEncoder(inputCol='carrier_index',outputCol='carrier_fact')

Make a VectorAssembler \\
vec_assembler = VectorAssembler(inputCols=['month','air_time','carrier_fact','dest_fact','plane_age'], outputCol='features')

Import Pipeline \\
from pyspark.ml import Pipeline

Make the pipeline \\
flights_pipe = Pipeline(stages=[dest_indexer,dest_encoder,carr_indexer,carr_encoder,vec_assembler])

Fit and transform the data \\
piped_data = flights_pipe.fit(model_data).transform(model_data)

Split the data into training and test sets \\
training, test = piped_data.randomSplit([0.6,0.4])

**Course 4: Model tuning and selection**

Import LogisticRegression \\
from pyspark.ml.classification import LogisticRegression

Create a LogisticRegression Estimator \\
lr = LogisticRegression()

Import the evaluation submodule \\
import pyspark.ml.evaluation as evals

Create a BinaryClassificationEvaluator \\
evaluator = BinaryClassificationEvaluator(metricName = 'areaUnderROC')

Import the tuning submodule \\
import pyspark.ml.tuning as tune

Create the parameter grid \\
grid = tune.ParamGridBuilder()

Add the hyperparameter \\
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0,1])

Build the grid \\
grid = grid.build()

Create the CrossValidator \\
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

Use the model to predict the test set \\
test_results = best_lr.transform(test)

Evaluate the predictions \\
print(evaluator.evaluate(test_results))

###**Big data foundation with PySpark**

**Course 1: Introduction to big data analysis in Spark**

Print the version of SparkContext \\
print("The version of Spark Context in the PySpark shell is", sc.version)

Print the Python version of SparkContext \\
print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)

Print the master of SparkContext \\
print("The master of Spark Context in the PySpark shell is", sc.master)

Create a python list of numbers from 1 to 100 \\
numb = range(1, 100)

Load the list into PySpark  \\
spark_data = sc.parallelize(numb)

Load a local file into PySpark shell \\
lines = sc.textFile(file_path)

**Use of lambda() with map():** \\
The map() function in Python returns a list of the results after applying the given function to each item of a given iterable (list, tuple etc.). The general syntax of map() function is map(fun, iter). We can also use lambda functions with map(). The general syntax of map() function with lambda() is map(lambda <agument>:<expression>, iter)

Square all numbers in my_list \\
squared_list_lambda = list(map(lambda x: x ** 2, my_list))

Print the result of the map function \\
print("The squared numbers are", squared_list_lambda)

**Use of lambda() with filter():** \\
Another function that is used extensively in Python is the filter() function. The filter() function in Python takes in a function and a list as arguments. The general syntax of the filter() function is filter(function, list_of_input). Similar to the map(), filter() can be used with lambda() function. The general syntax of the filter() function with lambda() is filter(lambda <argument>:<expression>, list)

Filter numbers divisible by 10 \\
filtered_list = list(filter(lambda x: (x%10 == 0), my_list2))



**Course 2: Programming in PySpark RDD’s**

**Create an RDD from a list of words \\
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])**

Print out the type of the created object \\
print("The type of RDD is", type(RDD))

**Create a fileRDD from file_path \\
fileRDD = sc.textFile(file_path)**

Create a fileRDD_part from file_path with 5 partitions \\
fileRDD_part = sc.textFile(file_path, minPartitions = 5)

Check the number of partitions in fileRDD_part \\
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())

**Create map() transformation to cube numbers \\
cubedRDD = numbRDD.map(lambda x: x ** 3)**

Collect the results \\
numbers_all = cubedRDD.collect()

Print the numbers from numbers_all \\
for numb in numbers_all:
	print(numb)

**Filter the fileRDD to select lines with Spark keyword \\
fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line)**

**How many lines are there in fileRDD? \\
print("The total number of lines with the keyword Spark is", fileRDD_filter.count())**

Print the first four lines of fileRDD \\
for line in fileRDD_filter.take(4): 
  print(line)

Create PairRDD Rdd with key value pairs \\
Rdd = sc.parallelize([(1,2), (3,4), (3,6), (4,5)])

**Apply reduceByKey() operation on Rdd \\
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)**

Iterate over the result and print the output \\
for num in Rdd_Reduced.collect(): 
  print("Key {} has {} Counts".format(num[0], num[1]))

:Key 4 has 5 Counts
Key 1 has 2 Counts
Key 3 has 10 Counts

**Sort the reduced RDD with the key by descending order \\
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)**

Iterate over the result and print the output \\
for num in Rdd_Reduced_Sort.collect():
  print("Key {} has {} Counts".format(num[0], num[1]))

: Key 4 has 5 Counts
Key 3 has 10 Counts
Key 1 has 2 Counts

**Transform the rdd with countByKey() \\
total = Rdd.countByKey()**

What is the type of total? \\
print("The type of total is", type(total))

**Iterate over the total and print the output \\
for k, v in total.items(): 
  print("key", k, "has", v, "counts")**

:The type of total is <class 'collections.defaultdict'>
key 1 has 2 counts
key 1 has 2 counts
key 1 has 2 counts

Create a baseRDD from the file path \\
baseRDD = sc.textFile(file_path)

Split the lines of baseRDD into words \\
splitRDD = baseRDD.flatMap(lambda x: x.split())

Count the total number of words \\
print("Total number of words in splitRDD:", splitRDD.count())

Convert the words in lower case and remove stop words from stop_words \\
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

Create a tuple of the word and 1 \\
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

Count of the number of occurences of each word \\
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

Display the first 10 words and their frequencies \\
for word in resultRDD.take(10):
	print(word)

Swap the keys and values \\
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

Sort the keys in descending order \\
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

Show the top 10 most frequent words and their frequencies \\
for word in resultRDD_swap_sort.take(10):
	print("{} has {} counts". format(word[1], word[0]))

:   thou has 4247 counts
    thy has 3630 counts
    shall has 3018 counts
    good has 2046 counts
    would has 1974 counts
    Enter has 1926 counts
    thee has 1780 counts
    I'll has 1737 counts
    hath has 1614 counts
    like has 1452 counts

**Course 3: PySpark SQL & DataFrames**

In [0]:
Create a list of tuples \\
sample_list = [('Mona',20), ('Jennifer',34),('John',20), ('Jim',26)]

Create a RDD from the list \\
rdd = sc.parallelize(sample_list)

Create a PySpark DataFrame \\
names_df = spark.createDataFrame(rdd, schema=['Name', 'Age'])

Check the type of names_df \\
print("The type of names_df is", type(names_df))

Create an DataFrame from file_path \\
people_df = spark.read.csv(file_path, header=True, inferSchema=True)

Print the first 10 observations \\
people_df.show(10)

Count the number of rows \\
print("There are {} rows in the people_df DataFrame.".format(people_df.count()))

Count the number of columns and their names \\
print("There are {} columns in the people_df DataFrame and their names are {}".format(len(people_df.columns), people_df.columns))

Select name, sex and date of birth columns\\
people_df_sub = people_df.select('name', 'sex', 'date of birth')

Print the first 10 observations from people_df_sub\\
people_df_sub.show(10)

Remove duplicate entries from people_df_sub\\
people_df_sub_nodup = people_df_sub.dropDuplicates()

Filter people_df to select females \\
people_df_female = people_df.filter(people_df.sex == "female")

Create a temporary table "people" \\
people_df.createOrReplaceTempView("people")

Construct a query to select the names of the people from the temporary table "people" \\
query = '''SELECT name FROM people'''

Assign the result of Spark's query to people_df_names \\
people_df_names = spark.sql(query)

Filter the people table to select female sex \\
people_female_df = spark.sql('SELECT * FROM people WHERE sex=="female"')

Check the schema of columns \\
fifa_df.printSchema()

Summary of numeric cols \\
spark_dataFrame.describe()

**Vis using toPandas, HandySpark**
Check the column names of names_df\\
print("The column names of names_df are", names_df.columns)

Convert to Pandas DataFrame  \\
df_pandas = names_df.toPandas()

Create a horizontal bar plot \\
df_pandas.plot(kind='barh', x='Name', y='Age', colormap='winter_r')

plt.show()

**Course 4: PySpark MLlib Algorithms**

Import the library for ALS \\
from pyspark.mllib.recommendation import ALS

Import the library for Logistic Regression \\
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

Import the library for Kmeans \\
from pyspark.mllib.clustering import KMeans

**1. Collaborative Filtering** \\
Finding users that shares common interests; Recommendation system; \\

Load the data into RDD \\
data = sc.textFile(file_path)

Split the RDD \\
ratings = data.map(lambda l: l.split(','))

Transform the ratings RDD \\
ratings_final = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))

Split the data into training and test \\
training_data, test_data = ratings_final.randomSplit([0.8, 0.2])

Create the ALS model on the training data \\
model = ALS.train(training_data, rank=10, iterations=10)

Drop the ratings column \\
testdata_no_rating = test_data.map(lambda p: (p[0], p[1]))

Predict the model  \\
predictions = model.predictAll(testdata_no_rating)

Print the first rows of the RDD \\
predictions.take(2)

Prepare ratings data \\
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))

Prepare predictions data \\
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))

Join the ratings data with predictions data \\
rates_and_preds = rates.join(preds)

Calculate and print MSE \\
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))


**2. Classification**
Load the datasets into RDDs \\
spam_rdd = sc.textFile(file_path_spam)
non_spam_rdd = sc.textFile(file_path_non_spam)

Split the email messages into words \\
spam_words = spam_rdd.flatMap(lambda email: email.split(' '))
non_spam_words = non_spam_rdd.flatMap(lambda email: email.split(' '))

Print the first element in the split RDD \\
print("The first element in spam_words is", spam_words.first())
print("The first element in non_spam_words is", non_spam_words.first())

:    The first element in spam_words is You
    The first element in non_spam_words is Rofl.

Create a HashingTf instance with 200 features \\
tf = HashingTF(numFeatures=200)

Map each word to one feature \\
spam_features = tf.transform(spam_words)
non_spam_features = tf.transform(non_spam_words)

Label the features: 1 for spam, 0 for non-spam \\
spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))
non_spam_samples = non_spam_features.map(lambda features:LabeledPoint(0, features))

Combine the two datasets \\
samples = spam_samples.join(non_spam_samples)

Split the data into training and testing \\
train_samples,test_samples = samples.randomSplit([0.8, 0.2])

Train the model \\
model = LogisticRegressionWithLBFGS.train(train_samples)

Create a prediction label from the test data \\
predictions = model.predict(test_samples.map(lambda x: x.features))

Combine original labels with the predicted labels \\
labels_and_preds = test_samples.map(lambda x: x.label).zip(predictions)

Check the accuracy of the model on the test data \\
accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_samples.count())
print("Model accuracy : {:.2f}".format(accuracy))

**3. Clustering**

Load the dataset into a RDD \\
clusterRDD = sc.textFile(file_path)

Split the RDD based on tab \\
rdd_split = clusterRDD.map(lambda x: x.split("\t"))

Transform the split RDD by creating a list of integers \\
rdd_split_int = rdd_split.map(lambda x: [int(x[0]), int(x[1])])

Count the number of rows in RDD \\
print("There are {} rows in the rdd_split_int dataset".format(rdd_split_int.count()))

Train the model with clusters from 13 to 16 and compute WSSSE \\
for clst in range(13, 17):
    model = KMeans.train(rdd_split_int, clst, seed=1)
    WSSSE = rdd_split_int.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("The cluster {} has Within Set Sum of Squared Error {}".format(clst, WSSSE))

:The cluster 13 has Within Set Sum of Squared Error 249164132.49410182
The cluster 14 has Within Set Sum of Squared Error 209371154.24941802
The cluster 15 has Within Set Sum of Squared Error 169394691.52639425

Train the model again with the best k \\
model = KMeans.train(rdd_split_int, k=15, seed=1)

Get cluster centers \\
cluster_centers = model.clusterCenters

Convert rdd_split_int RDD into Spark DataFrame \\
rdd_split_int_df = spark.createDataFrame(rdd_split_int, schema=["col1", "col2"])

Convert Spark DataFrame into Pandas DataFrame \\
rdd_split_int_df_pandas = rdd_split_int_df.toPandas()

Convert "cluster_centers" that you generated earlier into Pandas DataFrame \\
cluster_centers_pandas = pd.DataFrame(cluster_centers, columns=["col1", "col2"])

Create an overlaid scatter plot \\
plt.scatter(rdd_split_int_df_pandas["col1"], rdd_split_int_df_pandas["col2"])
plt.scatter(cluster_centers_pandas["col1"], cluster_centers_pandas["col2"], color="red", marker="x")
plt.show()

## **Cleaning Data with PySpark**

**Course 1: DataFrame details**

Import the pyspark.sql.types library \\
`from pyspark.sql.types import *`

**Define a new schema using the StructType method** \\
"Define a StructField for each field" \\
`people_schema = StructType([ \\
  StructField('name', StringType(), False), \\
  StructField('age', IntegerType(), False), \\
  StructField('city', StringType(), False) \\
])`

Load the CSV file \\
`aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')`

Add the airport column using the F.lower() method \\
`aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))`

Drop the Destination Airport column \\
`aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])`

Show the DataFrame \\
`aa_dfw_df.show()`

The **Parquet format** is a columnar data store, allowing Spark to use predicate pushdown. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.

Save the df3 DataFrame in Parquet format \\
`df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')`

Read the Parquet file into a new DataFrame and run a count \\
`print(spark.read.parquet('AA_DFW_ALL.parquet').count())`

**Run SQL query on Parquet data**
Read the Parquet file into flights_df \\
`flights_df = spark.read.parquet('AA_DFW_ALL.parquet')`

Register the temp table \\
`flights_df.createOrReplaceTempView('flights')`

Run a SQL query of the average flight duration \\
`avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]`

`print('The average flight time is: %d' % avg_duration)`

**Course 2: Manipulating DataFrames in the real world**

**Show the distinct VOTER_NAME entries** \\
`voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)`

Filter voter_df where the VOTER_NAME is 1-20 characters in length \\
`voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')`

**Filter out voter_df where the VOTER_NAME contains an underscore** \\
`voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))`

Show the distinct VOTER_NAME entries again \\
`voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)`

**Add a new column called splits separated on whitespace** \\
`voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))`

Create a new column called first_name based on the first item in splits \\
`voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))`

**Get the last entry of the splits list and create a column called last_name** \\
`voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))`

Drop the splits column \\
`voter_df = voter_df.drop('splits')`

Add a column to voter_df for any voter with the title 'Councilmember' \\
`voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand()))`

**Add a column to voter_df for a voter based on their position** \\
`voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))`

Use the .filter() clause with random_val \\
`voter_df.filter(voter_df.random_val == 0).show()`

Return a space separated string of names \\
`def getFirstAndMiddle(names):`
  `return ' '.join(names[:-1])`

**Define the method as a UDF** \\
`udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())`

Create a new column using your UDF \\
`voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))`

Drop the unnecessary columns then show the DataFrame \\
`voter_df = voter_df.drop('first_name')`
`voter_df = voter_df.drop('splits')`

Select all the unique council voters \\
`voter_df = df.select(df["VOTER NAME"]).distinct()`

Count the rows in voter_df \\
`print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())`

Add a ROW_ID \\
`voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())`

Show the rows with 10 highest IDs in the set \\
`voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)`

**Print the number of partitions in each DataFrame** \\
`print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())`
`print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())`

**Add a ROW_ID field to each DataFrame** \\
`voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())`
`voter_df_single = voter_df_single.withColumn('ROW_ID', F.monotonically_increasing_id())`

Show the top 10 IDs in each DataFrame  \\
`voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)`
`voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)`

Determine the highest ROW_ID and save it in previous_max_ID \\
`previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]`

Add a ROW_ID column to voter_df_april starting at the desired value \\
`voter_df_april = voter_df_april.withColumn('ROW_ID', previous_max_ID + F.monotonically_increasing_id())`

Show the ROW_ID from both DataFrames and compare \\
`voter_df_april.select('ROW_ID').show()`
`voter_df_march.select('ROW_ID').show()`



**Course 3:Improving Performance**

Add caching to the unique rows in departures_df \\
`departures_df = departures_df.distinct().cache()`

Remove departures_df from the cache \\
`departures_df.unpersist()`

Check the cache status again \\
`print("Is departures_df cached?: %s" % departures_df.is_cached)`

Import the full and split files into DataFrames \\
`full_df = spark.read.csv('departures_full.txt.gz')`

`split_df = spark.read.csv('departures_*.txt.gz')`

Name of the Spark application instance \\
`app_name = spark.conf.get('spark.app.name')`

Driver TCP port \\
`driver_tcp_port = spark.conf.get('spark.driver.port')`

Number of join partitions \\
`num_partitions = spark.conf.get('spark.sql.shuffle.partitions')`

Show the results \\
`print("Name: %s" % app_name)`

`print("Driver TCP port: %s" % driver_tcp_port)`

`print("Number of partitions: %s" % num_partitions)`

Configure Spark to use 500 partitions \\
`spark.conf.set('spark.sql.shuffle.partitions', 500)`

Import the broadcast method from pyspark.sql.functions \\
`from pyspark.sql.functions import broadcast`

Join the flights_df and airports_df DataFrames using broadcasting \\
`broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )`

Show the query plan and compare against the original \\
`broadcast_df.explain()`

**Course 4: Complex processing and data pipelines**

Import the data to a DataFrame \\
`departures_df = spark.read.csv('2015-departures.csv.gz', header=True)`

Remove any duration of 0 \\
`departures_df = departures_df.filter(departures_df[3] >0)`

Add an ID column \\
`departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())`

Write the file out to JSON format \\
`departures_df.write.json('output.json')`

Import the file to a DataFrame and perform a row count \\ 
`annotations_df = spark.read.csv('annotations.csv.gz', sep='|')`

`full_count = annotations_df.count()`

Count the number of rows beginning with '#' \\
`comment_count = annotations_df.where(col('_c0').startswith('#')).count()`

Import the file to a new DataFrame, without commented rows \\
`no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')`

Count the new DataFrame and verify the difference is as expected \\
`no_comments_count = no_comments_df.count()`

`print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))`

Import the file to a DataFrame and perform a row count \\
`annotations_df = spark.read.csv('annotations.csv.gz', sep='|')`

`full_count = annotations_df.count()`

Count the number of rows beginning with '#' \\
`comment_count = annotations_df.where(col('_c0').startswith('#')).count()`

**Import the file to a new DataFrame, without commented rows** \\
`no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')`

Count the new DataFrame and verify the difference is as expected \\
`no_comments_count = no_comments_df.count()`

`print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))`

Split _c0 on the tab character and store the list in a variable \\
`tmp_fields = F.split(annotations_df['_c0'], '\t')`

Create the colcount column on the DataFrame \\
`annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))`

Remove any rows containing fewer than 5 fields \\
`annotations_df_filtered = annotations_df.filter(~ (annotations_df.colcount < 5))`

Count the number of rows \\
`final_count = annotations_df_filtered.count()`

`print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))`


In [0]:
# The following are from https://medium.com/@rmache/big-data-with-spark-in-google-colab-7c046e24b3
# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install pyspark
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
# Point Colaboratory to your Google Drive

from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
# Download datasets directly to your Google Drive "Colab Datasets" folder

import requests

# 2007 data

file_url = "http://stat-computing.org/dataexpo/2009/2007.csv.bz2"

r = requests.get(file_url, stream = True) 

with open("/content/gdrive/My Drive/Colab Datasets/2007.csv.bz2", "wb") as file: 
	for block in r.iter_content(chunk_size = 1024): 
		if block: 
			file.write(block)

# 2008 data

file_url = "http://stat-computing.org/dataexpo/2009/2008.csv.bz2"

r = requests.get(file_url, stream = True) 

with open("/content/gdrive/My Drive/Colab Datasets/2008.csv.bz2", "wb") as file: 
	for block in r.iter_content(chunk_size = 1024): 
		if block: 
			file.write(block)