# Simple Spark test: read and filter tiny CSV file

# <span style="color:red"> NOTE </span>

Please set the `context_already_defined` flag depending on your context.

If this notebook is started against a pyspark session, there will already be
a defined context called `spark`. Attempts to create a new one will fail.

If this notebook is meant to start a Spark context in local mode,
set `context_already_defined = False`, sine one needs to create a context.

In [1]:
from pyspark.sql.types import *

In [2]:
context_already_defined = True

if context_already_defined:
    sc = spark.sparkContext
    sqlc = SQLContext(sc)
    
else:
    from pyspark import SparkContext, SparkConf, SQLContext

    conf = SparkConf().setAppName("msmap-filter").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    sqlc = SQLContext(sc)

## Create tiny CSV file and read it into a pyspak Dataframe

In [3]:
tiny_csv="""
A,B
1,4.45
2,4.55
3,7.7
4,8.2"""

with open('test_data.csv', 'w') as tiny_csv_file:
    tiny_csv_file.write(tiny_csv)
    
! cat test_data.csv

A,B
1,4.45
2,4.55
3,7.7
4,8.2


In [4]:
test_file = 'test_data.csv'

ms_schema = StructType([StructField("A", ByteType()),
                        StructField("B", FloatType())])

test_data = sqlc.read.csv(test_file, schema=ms_schema, header="true")

In [5]:
test_data.show(2)

+---+----+
|  A|   B|
+---+----+
|  1|4.45|
|  2|4.55|
+---+----+
only showing top 2 rows



## Please inspect the application SparkUI for stage execution trace and SQL

## Stage Details

```
org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:280)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:745)
```


## SQL for stage

WholeStageCodegen: Scan csv number of output rows: 4number of files: 1metadata time (ms): 6

```
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Relation[A#0,B#1] csv

== Analyzed Logical Plan ==
A: tinyint, B: float
GlobalLimit 3
+- LocalLimit 3
   +- Relation[A#0,B#1] csv

== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Relation[A#0,B#1] csv

== Physical Plan ==
CollectLimit 3
+- *FileScan csv [A#0,B#1] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<edited>/test_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:tinyint,B:float>
```

## Simple numerical filter on the inner RDD

In [6]:
test_data = test_data.rdd.filter(lambda row: row['A'] % 2 == 0)
test_data.take(4)

[Row(A=2, B=4.550000190734863), Row(A=4, B=8.199999809265137)]

## Stage Details

```
org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:280)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:745)
```