### Spark Definitive Guide Textbook Notes

#### Chapter 1 - Overview of Big Data and Spark

Apache Spark - unified computing engine and set of libraries for parallel data processing on computer clusters
* designed to support a wide range of data analytics tasks, over same computing engine with consistent API's
* Especially helpful for big data, where local computational resources are insufficient

Scope - computing engine. Focuses on performing computations over data, no matter where it resides.

Motivation - Collecting data is extremely inexpensive, but processing it requires large, parallel computations, often on cluster machines.

Spark differentiator - focuses on just the computation, trying to be flexible with the sources of data it can work on

#### Chapter 2 - A gentle introduction to Spark

##### Basic Architecture

Single machines do not have enough power and resources to perform computations on large amounts of information. A **cluster** pools the resources of many machines together, giving us the ability to use all the cumulative resources as if they were a single computer. Spark manages and coordinates the execution of tasks on data across clusters of computers

##### Spark Applications

Cluster manager:
* keeps track of resources available

driver:
* maintains info about spark application
* responds to user's program/input
* analyse, distribute and schedule work across executors

executors:
* responsible for actually carrying out the work
* execute code assigned to it, report the state of computation

##### Language API's

make it possible to run spark code using various programming languages. At core, executors run on the JVM

In [11]:
import findspark
from tensorboard.compat.tensorflow_stub.tensor_shape import vector

findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Example").getOrCreate()


##### Dataframe

The most common structured API, simply represents a table of data. Similar to a spreadsheet, though can span thousands of computers

Partitions - chunks of the spreadsheet on each computer. Parallelism limited by number of partitions

Core data structure is immutable, to change, need to use transformations

In [8]:
myRange = spark.range(1000).toDF('number')
myRange.show(5)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+
only showing top 5 rows


In [9]:
# example transformation

divisby2 = myRange.where('number % 2 == 0')
divisby2.show(5)

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
+------+
only showing top 5 rows


##### Transformation types

Narrow transformation - each input partition will contribute to only one output partition

Wide transformation - input partitions contribute to many output partitions. Spark exchanges partitions across the cluster

##### Lazy evaluation

Spark waits till very last moment to execute graph of computation instructions. Means spark can optimise the entire data workflow from end to end.

Actions - used to trigger the computation

In [39]:
# using schema inference, so Spark takes a best guess at what schema of dataframe should be

# reading data a lazy operation

flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("data/flight-data/csv/2015-summary.csv")

In [16]:
# add sort, and view the lazy computation plan
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#31 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#31 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=61]
      +- FileScan csv [DEST_COUNTRY_NAME#29,ORIGIN_COUNTRY_NAME#30,count#31] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/fraserbytheway/PycharmProjects/DataML/Data/Spark/data/flig..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [33]:
# specify number of data partitions
spark.conf.set("spark.sql.shuffle.partitions", "5")

flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [35]:
# Change dataframe into a table or view
flightData2015.createOrReplaceTempView("flight_data_2015")

In [36]:
# SQL Query

sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#29], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#29, 5), ENSURE_REQUIREMENTS, [plan_id=182]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#29], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#29] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/fraserbytheway/PycharmProjects/DataML/Data/Spark/data/flig..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#29], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#29, 5), ENSURE_REQUIREMENTS, [plan_id=195]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#29], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#29] Batched: false, DataFilters: [], Format: CSV, Location: InMe

In [45]:
# find max (effectively filtering down to one row)

from pyspark.sql.functions import max

flightData2015.select(max('count')).take(1)

[Row(max(count)=370002)]

Exectution plan is a directed acyclic graph (DAG) of transformations

In [50]:
# more complicated expression

from pyspark.sql.functions import desc

flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#162L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#115,destination_total#162L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#115], functions=[sum(count#117)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#115, 5), ENSURE_REQUIREMENTS, [plan_id=309]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#115], functions=[partial_sum(count#117)])
            +- FileScan csv [DEST_COUNTRY_NAME#115,count#117] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/fraserbytheway/PycharmProjects/DataML/Data/Spark/data/flig..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|           

#### Chapter 3 - A Tour of Spark's Toolset

##### Datasets - type-safe structured APIs

Used for writing statically typed code in Java and Scala. Cannot accidently view the objects in a dataset as being of another class, than the class put in initially.

* Dataset[Person] will be guaranteed to contain objects of type person

##### Structured Streaming

Take same operations performed in batch mode using Spark's structured API, and run them in a streaming fashion. This can reduce latency and allow for incremental processing.

With streaming data keeps arriving, treats incoming data as rows being appended to an "Unbounded Table."

In [52]:
# structured streaming

staticDataFrame = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

26/02/10 15:29:08 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: data/retail-data/by-day/*.csv.
java.io.FileNotFoundException: File data/retail-data/by-day/*.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:980)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1301)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:970)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.sinks.FileStreamSink$.hasMetadata(FileStreamSink.scala:58)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:384)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catal

In [54]:
# some data grouping operations

from pyspark.sql.functions import window, column, desc, col

spark.conf.set("spark.sql.shuffle.partitions", "5")

staticDataFrame\
    .selectExpr(
    "CustomerID",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
    .groupBy(
    col("CustomerID"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")\
    .show(5)

+----------+--------------------+------------------+
|CustomerID|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   13417.0|{2011-12-04 11:00...|            404.83|
|   12782.0|{2011-12-04 11:00...|252.24999999999997|
|   16513.0|{2011-12-04 11:00...|             121.8|
|   15392.0|{2011-12-05 11:00...|304.40999999999997|
|   15290.0|{2011-12-05 11:00...|263.02000000000004|
+----------+--------------------+------------------+
only showing top 5 rows


In [55]:
# using streaming instead
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/data/retail-data/by-day/*.csv")

In [56]:
streamingDataFrame.isStreaming

True

In [58]:
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

triggering the streaming operation. Bit different as result is being populating data somewhere

In [91]:

purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

26/02/10 15:58:40 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/65/kfyt6zd17rnft11mhd0p3ysc0000gn/T/temporary-f6165344-8dd3-42ce-ac71-221d32607ae5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
26/02/10 15:58:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


IllegalArgumentException: Cannot start query with name customer_purchases as a query with that name is already active in this SparkSession

In [69]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [70]:
purchaseByCustomerPerHour.writeStream\
.format("console")\
.queryName("customer_purchases_2")\
.outputMode("complete")\
.start()

26/02/10 15:39:43 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/65/kfyt6zd17rnft11mhd0p3ysc0000gn/T/temporary-5654c80b-46df-405c-b3cf-213552bb2f3d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
26/02/10 15:39:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


IllegalArgumentException: Cannot start query with name customer_purchases_2 as a query with that name is already active in this SparkSession

##### Machine Learning and Advanced Analytics

Can perform large scale machine learning with a built - in library of machine learning algorithms, called MLlib.

In [71]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [72]:
from pyspark.sql.functions import date_format, col

preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
    .coalesce(5) # used to efficiently reduce the number of partitions

In [73]:
# split into train and test data
trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")

testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

In [75]:
trainDataFrame.count()

245903

In [76]:
testDataFrame.count()

296006

Transforming the day of the week into a OHE numeric feature

In [77]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

In [78]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")

Creating input vector, a set of numeric columns

In [80]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
    .setOutputCol("features")

Pipeline to make transformations reproducible

In [81]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

In [82]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

                                                                                

In [83]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

##### Cache

With cache - spark computes the dataframe once, saves it to memory, and resuses it for subsequent actions. Means spark doesn't need to recompute the entire dataframe

In [None]:
transformedTraining.cache()

In [85]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(0)

##### Training Machine Learning Models

two-phase process. First, initialise an untrained model, then train it. Follow the naming pattern of Algorithm for untrained, AlgorithmModel for trained version.

In [86]:
kmModel = kmeans.fit(transformedTraining)

26/02/10 15:54:15 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [90]:
transformedTest = fittedPipeline.transform(testDataFrame)

##### Lower-Level APIs

Spark has a variety of lower level primitives to allow for arbritary Java and Python object manipulation via RDDs.

Resilient Distributed Datasets (RDDs)
* reveal physical execution characteristics
* can be used to parallelise raw data


### Chapter 4 - Structured API's

Tool for manipulating all sorts of data. These APIs refer to three core types of distributed collection APIs:
* Datasets
* Dataframes
* SQL tables and views

Spark recap: Spark is a distributed programming model in which the user specifies transformations. Multiple transformations build up a directed acyclic graph of instructions. An action begins the process of executing these instructions, as a single job, by breaking it down into stages and clusters. The logical structures that we manipulate with transformations and actions are Dataframes and Datasets

Dataframes/sets are distributed table like collections with well defined rows and columns. Properties:

* schemas - defines column names and types of a dataframe. Can be defined or read from a datasource

##### Spark Types

Spark is effectively a programming language of its own. Uses an engine called catalyst that maintains its own type information. Spark will convert an expression written in input to Sparks internal catalyst rep.

##### Dataframes vs Datasets

Dataframe - only checks whether types line up to those specified in the schema at runtime
Dataset - checks at compiile time. Only available to JVM languages

##### Columns

represent a simple type like an integer or a string, a complex type like an array or map, or a null value. Spark tracks all this information, and offers a variety of ways, with which you can transform columns.

##### Rows

A record of data

##### Types - See page 60 for useful manual

##### Structured API Execution

1. Write dataframe code
2. If valid, spark converts to a logical plan
3. Spark transforms logical plan to a physical plan, checking optimisations along the way
4. Spark then executes the physical plan (RDD manipulations) on the cluster


##### Logical Plan
Only represents a set of abstract transformations that do not refer to executors or drivers. Purely to convert the user's set of expressions into the most optimised version.

Spark uses the catalog, a repository of all table and DataFrame information, to resolve
columns and tables in the analyzer. The analyzer might reject the unresolved logical plan if the
required table or column name does not exist in the catalog. If the analyzer can resolve it, the
result is passed through the Catalyst Optimizer, a collection of rules that attempt to optimize the
logical plan by pushing down predicates or selections.

##### Physical Plan

After successfully creating an optimised logical plan, spark begins physical planning process. Specifies how plan will execute on the cluster, by generating different physical execution strategies, and comparing them through a cost model.

##### Execution

Once spark has a physical plan, proceeds to execute the code over RDDs

### Chapter 5 - Basic Structured Operations

##### Basic Structure

Dataframe consists of a series of records (like rows in a table), and columns (like spreadsheet columns) that represent a computation expression that can be performed on each individual record in the dataset. Schemas define name and type of data in each column. Partitioning defines layout of DataFrame across the cluster

In [92]:
df = spark.read.format("json").load("data/flight-data/json/2015-summary.json")

In [93]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



Can either let a data source define the schema, or define it explicitly ourselves. Schema inferene is less safe, as sometimes types are inferred. Schema format below

In [94]:
spark.read.format("json").load("data/flight-data/json/2015-summary.json").schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

Schema is a structtype, made up of a number of fields, StructFields. These have a name, type, a Boolean flag for missing values or null values, and user can optionally specify associated metadata with that column

In [96]:
# Manual Schema Specification

from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema)\
.load("data/flight-data/json/2015-summary.json")

##### Columns and Expressions

Columns are logical constructions that simply represent a value computed on a per - record basis, by means of an expression

In [97]:
# ways to construct and refer to a column:
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")

Column<'someColumnName'>

Expressions - set of transformations on one or more values in a record in a Dataframe. Main thing is columns are just an expression, in samplist case is just like x = x

In [99]:
# access columns
spark.read.format("json").load("data/flight-data/json/2015-summary.json")\
.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

##### Records and Rows

Each row in a dataframe is a single record. Spark represents this as an object of type row. Spark manipulates row objects using column expressions in order to produce usable values

In [100]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [101]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)
myRow[0]

'Hello'

##### Dataframe Transformations



In [102]:
# Create a dataframe

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|NULL|    1|
+-----+----+-----+



Select method, select columns within the dataframe

In [103]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows


In [105]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

# can refer to columns in a number of different ways
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows
+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows


In [106]:
# Using select expressions to build up more complex dataframes

df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows


In [107]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



Using literals

In [108]:
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows


**Will come up when need to check whether a value is greater than some constant, or other programmatically created variable**

In [None]:
# alternate method, using with column

In [109]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows


.withColumnRenamed to rename columns

by default spark is case insensitive, though can make case sensitive by setting the config caseSensitive to Trye

can use df.drop("col") to drop specific columns

# Up to page 80