# Using Spark to do Data Analysis

#### Load Dataset

In [1]:
val flightsCSV = "datasets/usa_carrier_only.csv"
val flightsDS = sc.textFile(flightsCSV)

#### Action Operations

Actions consume the existing data in an RDD and return it to the caller.

In [2]:
flightsDS.collect

Array("UNIQUE_CARRIER","UNIQUE_CARRIER_NAME","ORIGIN_AIRPORT_ID","ORIGIN","DEST_AIRPORT_ID","DEST","MONTH",, "04Q","Tradewind Aviation",10135,"ABE",10721,"BOS",12,, "04Q","Tradewind Aviation",10154,"ACK",12197,"HPN",12,, "04Q","Tradewind Aviation",10154,"ACK",12197,"HPN",12,, "04Q","Tradewind Aviation",10154,"ACK",15167,"TEB",12,, "04Q","Tradewind Aviation",10198,"AGC",12197,"HPN",12,, "04Q","Tradewind Aviation",10208,"AGS",12197,"HPN",12,, "04Q","Tradewind Aviation",10257,"ALB",15167,"TEB",12,, "04Q","Tradewind Aviation",10322,"AOO",15167,"TEB",12,, "04Q","Tradewind Aviation",10423,"AUS",11259,"DAL",12,, "04Q","Tradewind Aviation",10431,"AVL",12197,"HPN",12,, "04Q","Tradewind Aviation",10540,"BED",14539,"RKD",12,, "04Q","Tradewind Aviation",10540,"BED",15167,"TEB"...

In [3]:
flightsDS.first

"UNIQUE_CARRIER","UNIQUE_CARRIER_NAME","ORIGIN_AIRPORT_ID","ORIGIN","DEST_AIRPORT_ID","DEST","MONTH",

In [4]:
flightsDS.take(10)

Array("UNIQUE_CARRIER","UNIQUE_CARRIER_NAME","ORIGIN_AIRPORT_ID","ORIGIN","DEST_AIRPORT_ID","DEST","MONTH",, "04Q","Tradewind Aviation",10135,"ABE",10721,"BOS",12,, "04Q","Tradewind Aviation",10154,"ACK",12197,"HPN",12,, "04Q","Tradewind Aviation",10154,"ACK",12197,"HPN",12,, "04Q","Tradewind Aviation",10154,"ACK",15167,"TEB",12,, "04Q","Tradewind Aviation",10198,"AGC",12197,"HPN",12,, "04Q","Tradewind Aviation",10208,"AGS",12197,"HPN",12,, "04Q","Tradewind Aviation",10257,"ALB",15167,"TEB",12,, "04Q","Tradewind Aviation",10322,"AOO",15167,"TEB",12,, "04Q","Tradewind Aviation",10423,"AUS",11259,"DAL",12,)

In [5]:
flightsDS.count()

21751

#### Filtering the Header

Filters are a Transformation function, like map, aggregate, etc. They consume the data, apply a transformation and return a new RDD for the sake of immutability.

In [6]:
// The first line of the file contains the header.
// By filtering based on one of the headers, we get only the remaining lines: the data.
val newFlightsDS = flightsDS.filter(x => !x.contains("UNIQUE_CARRIER")).map(x => x.split(","))

In [7]:
newFlightsDS.take(10)

Array(Array("04Q", "Tradewind Aviation", 10135, "ABE", 10721, "BOS", 12), Array("04Q", "Tradewind Aviation", 10154, "ACK", 12197, "HPN", 12), Array("04Q", "Tradewind Aviation", 10154, "ACK", 12197, "HPN", 12), Array("04Q", "Tradewind Aviation", 10154, "ACK", 15167, "TEB", 12), Array("04Q", "Tradewind Aviation", 10198, "AGC", 12197, "HPN", 12), Array("04Q", "Tradewind Aviation", 10208, "AGS", 12197, "HPN", 12), Array("04Q", "Tradewind Aviation", 10257, "ALB", 15167, "TEB", 12), Array("04Q", "Tradewind Aviation", 10322, "AOO", 15167, "TEB", 12), Array("04Q", "Tradewind Aviation", 10423, "AUS", 11259, "DAL", 12), Array("04Q", "Tradewind Aviation", 10431, "AVL", 12197, "HPN", 12))

#### Basic Transformations

In [8]:
val sentences = Seq("", "Where do we go from here?", "Right-down, slightly to the left of Mars.", "I'm the man from the planet Marzipan.")
val sentencesDS = sc.parallelize(sentences)

In [9]:
val flatSentencesDS = sentencesDS.filter(x => x != "").flatMap(x => x.split(" "))

In [10]:
flatSentencesDS.take(6)

Array(Where, do, we, go, from, here?)