In [1]:
from pyspark.sql import SparkSession

- Data Preparation for Notebook
```Shell
$ mkdir linkage
$ cd linkage/
$ curl -L -o donation.zip https://bit.ly/1Aoywaq
$ unzip donation.zip
$ unzip 'block_*.zip'
```

In [2]:
# Start Spark Session: since this is local, so only can start 1 cluster
spark=SparkSession.builder.appName('donation').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/19 19:22:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1. Data IO
- reading and writing dataframes in a variety of formats via the `DataFrameReader` and `DataFrameWriter` APIs
    - **parquet**: Leading columnar-oriented data storage format (default option in Spark)
    - **orc**: Another columnar-oriented data storage format
    - **json**: Supports many of the same schema-inference functionality that the CSV format does
    - **jdbc**: Connects to a relational database via the JDBC data connection standard
    - **avro**: Provides efficient message serialization and deserialization when using a streaming source such as Apache Kafka
    - **text**: Maps each line of a file to a dataframe with a single column of type string
    - **image**: Loads image files from a directory as a dataframe with one column, containing image data stored as image schema
    - **libsvm**: Popular text file format for representing labeled observations with sparse features
    - **binary**: Reads binary files and converts each file into a single dataframe row (new in Spark 3.0)
    - **xml**: Simple text-based format for representing structured information such as documents, data, configuration, or books (available via the spark-xml package)

#### 1.1. Load Data 
- To read data, you access **DataFrameReader** API by calling the `read` method on a SparkSession instance
```Python
# Method 1: format() & load()
d1 = spark.read.format("json").load("file.json")
# Method 2: json(), csv()
d2 = spark.read.json("file.json")
```

#### 1.2. Write Data
- To write data out again, you access the **DataFrameWriter** API via the `write`
```Python
d1.write.format("parquet").save("file.parquet")
d1.write.parquet("file.parquet")
```
- By default, Spark will throw an error if you try to save a dataframe to a file that already exists.
    - You can control Spark’s behavior in this situation via the `mode`
        - `Overwrite` the existing file, 
        - `Append` the data in the DataFrame to the file (if it exists), or 
        - `Ignore` the write operation if the file already exists and leave it in place

```Python
d2.write.format("parquet").mode("overwrite").save("file.parquet")
```

#### 1.3. Select
```Python
df.select("col_1", "col_2")
```

#### 1.4. `option`
- Each of the different file formats has its own set of options that can be set 
    - Like for CSV, `.option("nullValue", "?")`: replacing "?" with nullValue

In [12]:
parsed = spark.read.option("header", "true").option("nullValue", "?").\
          option("inferSchema", "true").csv("../data/linkage/block_*.csv")

                                                                                

In [13]:
# To see the inferred type for each column
parsed.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



#### 1.4. Defining a schema using `StructType` and `StructField`
- If you know the schema that you want to use for a file ahead of time, you can 
    - Method 1: Create an instance of the `pyspark.sql.types.StructType` class, named as `schema` and pass it to the Reader API via the schema function.
    - Method 2: Another way to define the `schema` is using **DDL (data definition language)** statements: `"id_1 INT, id_2 INT, cmp_fname_c1 DOUBLE"`
- Why ?: This can have a significant performance benefit when the dataset is very large, since Spark will not need to perform an extra pass over the data to figure out the data type of each column.
```Python
park.read.schema(schema).csv("...")
```

In [14]:
from pyspark.sql.types import *
# Method 1: define schema via StructType Class
schema = StructType([StructField("id_1", IntegerType(), False),
  StructField("id_2", StringType(), False),
  StructField("cmp_fname_c1", DoubleType(), False)])

In [15]:
schema

StructType([StructField('id_1', IntegerType(), False), StructField('id_2', StringType(), False), StructField('cmp_fname_c1', DoubleType(), False)])

## 2. Analyzing Data with the DataFrame API
#### Spark Transformation




#### Spark Action
Cause Spark to execute recipe to transform source
- `show(n)`, `first()`: prints the first n rows of the DataFrame
- `take(n)`: returns the first n rows as a list of Row
- `collect()`: return all the records as a list of Row 
    - **WARNING**: make sure will fit in driver program, especially for extremely large DataFrames, using these methods can be *dangerous and cause an out-of-memory* exception
    -  `toPandas` (same as `collect`) method to return all the contents of a DataFrame to the client as an `Array`. 
- `count`:  returns the number of objects in an DataFrame   
- `describe(*col)`:  computes statistics (count, mean, stddev, min, max) for numeric columns
    -  if no columns are given, this function computes statistics for all numerical columns
#### Handling `null` value:
- `df.fillna(0, subset=good_features)`: to fill null with 0, and only on certain columns, in this example is a list of good_features columns

In [21]:
parsed.show(5)

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--

In [17]:
parsed.first() #first element of the DataFrame, useful for sanity check

Row(id_1=3148, id_2=8326, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True)

In [18]:
parsed.count()

                                                                                

5749132

### 2.1. Cache
- `cache()` After the data has been parsed once, we’d like to save the data in its parsed form on the cluster so that we don’t have to reparse it

In [22]:
parsed.cache()

DataFrame[id_1: int, id_2: int, cmp_fname_c1: double, cmp_fname_c2: double, cmp_lname_c1: double, cmp_lname_c2: double, cmp_sex: int, cmp_bd: int, cmp_bm: int, cmp_by: int, cmp_plz: int, is_match: boolean]

In [24]:
parsed.groupBy('is_match').count().show()



+--------+-------+
|is_match|  count|
+--------+-------+
|    true|  20931|
|   false|5728201|
+--------+-------+



                                                                                

### 2.2. Refer Cols
- There are 2 ways we can reference the names of the columns in the DataFrame: 
    - literal strings, like in groupBy("is_match"), 
    - `col` objects from `pyspark.sql.functions` 

In [26]:
from pyspark.sql.functions import col
# col: return ~pyspark.sql.Column based on the given column name.
#      in this case, we want to order by column count of the result
parsed.groupBy('is_match').count().orderBy(col('count').desc()).show()

+--------+-------+
|is_match|  count|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



### 2.3. Dataframe aggregation functions
-  using the `agg` method of the **DataFrame API** in conjunction with the aggregation functions defined in the `pyspark.sql.functions collection`

In [27]:
from pyspark.sql.functions import avg, stddev

parsed.agg(avg('cmp_sex'), stddev('cmp_sex')).show()

+-----------------+--------------------+
|     avg(cmp_sex)|stddev_samp(cmp_sex)|
+-----------------+--------------------+
|0.955001381078048| 0.20730111116897781|
+-----------------+--------------------+



#### 2.3. Filtering Data with `.filter` or `where`
- `where` function is an alias for the `filter` function
    - pass to the where function can include statements that would be valid inside a `WHERE` clause in **Spark SQL**. 
```Python
df.filter(col('is_match') == False)

df.where("is_match = true")
```

### 2.4. `withColumn()` 
- Spark `withColumn()` is a transformation function of DataFrame that is used to 
    - create a new column/update existing columns
    - manipulate the column values of all rows or selected rows on DataFrame.

-  Syntax: `withColumn(colName : String, col : Column) -> DataFrame`
    - `colName`: String – specify a new column you wanted to create. use an existing column to update the value.
    - `col`: Column – column expression.

```Python
df = df.withColumn(col_name, df[ccol_name].cast(DoubleType()))
```

## 3. Spark SQL
- to tell **Spark SQL execution** engine the name, say `"linkage"`, it should associate with the `parsed` DataFrame


In [28]:
parsed.createOrReplaceTempView("linkage")

In [29]:
spark.sql("""
    SELECT is_match, COUNT(*) AS CNT
    FROM linkage
    GROUP BY is_match
    ORDER BY cnt DESC       
""").show()

+--------+-------+
|is_match|    CNT|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



## 4. Fast Summary Statistics for DataFrames

In [31]:
summary = parsed.describe()

                                                                                

In [33]:
summary.select('summary','cmp_fname_c1', 'cmp_fname_c2').show()

+-------+-------------------+------------------+
|summary|       cmp_fname_c1|      cmp_fname_c2|
+-------+-------------------+------------------+
|  count|            5748125|            103698|
|   mean| 0.7129024704437266|0.9000176718903189|
| stddev|0.38875835961628014|0.2713176105782334|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



In [34]:
matches = parsed.where("is_match = true") # need valid SQL inside a WHERE clause in Spark SQL. 
match_summary = matches.describe()

                                                                                

In [36]:
misses = parsed.filter(col('is_match') == False)
miss_summary = misses.describe()

                                                                                

## 5. Pivoting and Reshaping DataFrames
- `.toPandas()` Convert the DataFrames into pandas DataFrames
- Reshape them, and convert 
- `spark.createDataFrame(pandas_df)` Convert pandas DataFrames back to Spark DataFrames

Note: We can safely do this because of the small size of the summary, match_summary, and miss_summary DataFrames since pandas DataFrames reside in memory. 

In [37]:
summary_p = summary.toPandas()

In [40]:
summary_p.shape

(5, 12)

In [41]:
summary_p.head()

Unnamed: 0,summary,id_1,id_2,cmp_fname_c1,cmp_fname_c2,cmp_lname_c1,cmp_lname_c2,cmp_sex,cmp_bd,cmp_bm,cmp_by,cmp_plz
0,count,5749132.0,5749132.0,5748125.0,103698.0,5749132.0,2464.0,5749132.0,5748337.0,5748337.0,5748337.0,5736289.0
1,mean,33324.48559643438,66587.43558331935,0.7129024704437266,0.9000176718903189,0.3156278193080383,0.3184128315317443,0.955001381078048,0.2244652670850717,0.488855298497635,0.2227485966810923,0.0055286614743434
2,stddev,23659.859374488064,23620.48761326969,0.3887583596162801,0.2713176105782334,0.3342336339615828,0.3685670662006653,0.2073011111689778,0.4172297223846263,0.4998758236779031,0.4160909629831756,0.0741491492542004
3,min,1.0,6.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,99980.0,100000.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [42]:
# set_index to summary
# transpose row -> column
# reset index
summary_p = summary_p.set_index('summary').transpose().reset_index()

In [45]:
summary_p.head()

summary,index,count,mean,stddev,min,max
0,id_1,5749132,33324.48559643438,23659.859374488064,1.0,99980.0
1,id_2,5749132,66587.43558331935,23620.48761326969,6.0,100000.0
2,cmp_fname_c1,5748125,0.7129024704437266,0.3887583596162801,0.0,1.0
3,cmp_fname_c2,103698,0.9000176718903189,0.2713176105782334,0.0,1.0
4,cmp_lname_c1,5749132,0.3156278193080383,0.3342336339615828,0.0,1.0


In [46]:
summary_p = summary_p.rename(columns={'index':'field'}) # rename col "index" -> "field"
summary_p.head()

summary,field,count,mean,stddev,min,max
0,id_1,5749132,33324.48559643438,23659.859374488064,1.0,99980.0
1,id_2,5749132,66587.43558331935,23620.48761326969,6.0,100000.0
2,cmp_fname_c1,5748125,0.7129024704437266,0.3887583596162801,0.0,1.0
3,cmp_fname_c2,103698,0.9000176718903189,0.2713176105782334,0.0,1.0
4,cmp_lname_c1,5749132,0.3156278193080383,0.3342336339615828,0.0,1.0


In [47]:
summary_p = summary_p.rename_axis(None, axis=1) #remove the axis name

In [48]:
summary_p.head()

Unnamed: 0,field,count,mean,stddev,min,max
0,id_1,5749132,33324.48559643438,23659.859374488064,1.0,99980.0
1,id_2,5749132,66587.43558331935,23620.48761326969,6.0,100000.0
2,cmp_fname_c1,5748125,0.7129024704437266,0.3887583596162801,0.0,1.0
3,cmp_fname_c2,103698,0.9000176718903189,0.2713176105782334,0.0,1.0
4,cmp_lname_c1,5749132,0.3156278193080383,0.3342336339615828,0.0,1.0


In [49]:
summary_p.shape

(11, 6)

In [50]:
summaryT = spark.createDataFrame(summary_p)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [51]:
summaryT.show()

[Stage 39:>                                                         (0 + 1) / 1]

+------------+-------+-------------------+-------------------+---+------+
|       field|  count|               mean|             stddev|min|   max|
+------------+-------+-------------------+-------------------+---+------+
|        id_1|5749132|  33324.48559643438| 23659.859374488064|  1| 99980|
|        id_2|5749132|  66587.43558331935| 23620.487613269695|  6|100000|
|cmp_fname_c1|5748125| 0.7129024704437266|0.38875835961628014|0.0|   1.0|
|cmp_fname_c2| 103698| 0.9000176718903189| 0.2713176105782334|0.0|   1.0|
|cmp_lname_c1|5749132| 0.3156278193080383| 0.3342336339615828|0.0|   1.0|
|cmp_lname_c2|   2464| 0.3184128315317443|0.36856706620066537|0.0|   1.0|
|     cmp_sex|5749132|  0.955001381078048|0.20730111116897781|  0|     1|
|      cmp_bd|5748337|0.22446526708507172|0.41722972238462636|  0|     1|
|      cmp_bm|5748337|0.48885529849763504| 0.4998758236779031|  0|     1|
|      cmp_by|5748337| 0.2227485966810923| 0.4160909629831756|  0|     1|
|     cmp_plz|5736289|0.00552866147434

                                                                                

In [52]:
summaryT.printSchema()

root
 |-- field: string (nullable = true)
 |-- count: string (nullable = true)
 |-- mean: string (nullable = true)
 |-- stddev: string (nullable = true)
 |-- min: string (nullable = true)
 |-- max: string (nullable = true)



In [53]:
from pyspark.sql.types import DoubleType

for c in summaryT.columns:
    if c == 'field':
        continue
    summaryT = summaryT.withColumn(c, summaryT[c].cast(DoubleType()))

In [54]:
summaryT.printSchema()

root
 |-- field: string (nullable = true)
 |-- count: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- stddev: double (nullable = true)
 |-- min: double (nullable = true)
 |-- max: double (nullable = true)



In [55]:
from pyspark.sql.types import DoubleType

def pivot_summary(desc):
  # convert to pandas dataframe
  desc_p = desc.toPandas()
  # transpose
  desc_p = desc_p.set_index('summary').transpose().reset_index()
  desc_p = desc_p.rename(columns={'index':'field'})
  desc_p = desc_p.rename_axis(None, axis=1)
  # convert to Spark dataframe
  descT = spark.createDataFrame(desc_p)
  # convert metric columns to double from string
  for c in descT.columns:
    if c == 'field':
      continue
    else:
      descT = descT.withColumn(c, descT[c].cast(DoubleType()))
  return descT

In [56]:
match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [57]:
match_summaryT.show()

+------------+-------+------------------+--------------------+---+-------+
|       field|  count|              mean|              stddev|min|    max|
+------------+-------+------------------+--------------------+---+-------+
|        id_1|20931.0| 34575.72117911232|   21950.31285196913|5.0|99946.0|
|        id_2|20931.0| 51259.95939037791|   24345.73345377519|6.0|99996.0|
|cmp_fname_c1|20922.0|0.9973163859635038| 0.03650667584833679|0.0|    1.0|
|cmp_fname_c2| 1333.0|0.9898900320318176| 0.08251973727615237|0.0|    1.0|
|cmp_lname_c1|20931.0|0.9970152595958817|0.043118807533945126|0.0|    1.0|
|cmp_lname_c2|  475.0| 0.969370167843852| 0.15345280740388917|0.0|    1.0|
|     cmp_sex|20931.0| 0.987291577086618| 0.11201570591216435|0.0|    1.0|
|      cmp_bd|20925.0|0.9970848267622461| 0.05391487659807981|0.0|    1.0|
|      cmp_bm|20925.0|0.9979450418160095|0.045286127452170664|0.0|    1.0|
|      cmp_by|20925.0|0.9961290322580645| 0.06209804856731055|0.0|    1.0|
|     cmp_plz|20902.0|0.9

In [58]:
miss_summaryT.show()

+------------+---------+--------------------+--------------------+----+--------+
|       field|    count|                mean|              stddev| min|     max|
+------------+---------+--------------------+--------------------+----+--------+
|        id_1|5728201.0|  33319.913548075565|  23665.760130330676| 1.0| 99980.0|
|        id_2|5728201.0|   66643.44259218557|  23599.551728241313|30.0|100000.0|
|cmp_fname_c1|5727203.0|  0.7118634802175091| 0.38908060096985553| 0.0|     1.0|
|cmp_fname_c2| 102365.0|  0.8988473514090158| 0.27272090294010215| 0.0|     1.0|
|cmp_lname_c1|5728201.0|  0.3131380113364304|  0.3322812130572686| 0.0|     1.0|
|cmp_lname_c2|   1989.0| 0.16295544855122532|  0.1930236663528703| 0.0|     1.0|
|     cmp_sex|5728201.0|  0.9548833918362851| 0.20755988859217375| 0.0|     1.0|
|      cmp_bd|5727412.0|  0.2216425149788421|  0.4153518275558732| 0.0|     1.0|
|      cmp_bm|5727412.0|   0.486995347986141|  0.4998308940493865| 0.0|     1.0|
|      cmp_by|5727412.0|  0.

In [59]:
match_summaryT.createOrReplaceTempView('match_desc')
miss_summaryT.createOrReplaceTempView('miss_desc')

In [66]:
spark.sql("""
    SELECT a.field, a.count + b.count AS total, a.mean - b.mean AS delta
    FROM match_desc a, miss_desc b 
    WHERE a.field = b.field AND a.field NOT IN ("id_1", "id_2")      
    ORDER BY delta DESC, total DESC

""").show()

+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926266|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|  0.6838772482594513|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0|  0.2854529057459947|
|cmp_fname_c2| 103698.0| 0.09104268062280174|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+



- By this measure, `cmp_fname_c2` isn’t very useful because it’s missing a lot of the time, and the difference in the mean value for matches and nonmatches is relatively small — 0.09, for a score that ranges from 0 to 1. 
- The `cmp_sex` feature also isn’t particularly helpful because even though it’s available for any pair of records, the difference in means is just 0.03
- Good features: `cmp_plz`, `cmp_by`, `cmp_bd`, `cmp_lname_c1`, and `cmp_bm`

### 5.1. Scoring and Model Evaluation

- `expr` function from `pyspark.sql.functions` parses an input expression string into the column that it represents. 
    - For our scoring function, we are going to sum up the value of five fields (`cmp_plz`, `cmp_by`, `cmp_bd`, `cmp_lname_c1`, and `cmp_bm`).

In [67]:
good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]

sum_expression = " + ".join(good_features)
sum_expression

'cmp_lname_c1 + cmp_plz + cmp_by + cmp_bd + cmp_bm'

In [69]:
from pyspark.sql.functions import expr
parsed.show(5)

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--

In [70]:
# we’ll use 0 in place of the null value in our sum, for good_features col only
scored = parsed.fillna(0, subset=good_features).\
                withColumn('score', expr(sum_expression)).\
                select('score', 'is_match')

In [72]:
scored.show(10)

+-----+--------+
|score|is_match|
+-----+--------+
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 10 rows



##### 5.1.1. Cross Tabulation, or Crosstab
- create a *contingency* table (which is sometimes called a **cross tabulation**, or **crosstab**) that counts the number of records whose scores fall above/below the threshold value crossed with the number of records in each of those categories that were/were not matches.

- Since we don’t know what threshold value we’re going to use yet, let’s write a function that takes the scored DataFrame and the choice of threshold as parameters and computes the crosstabs using the DataFrame API:

- Note: that we are including the `selectExpr` method of the *DataFrame API* to dynamically determine the value of the field named above based on the value of the `t` argument using Python’s f-string formatting syntax

In [73]:
from pyspark.sql import DataFrame

def crossTabs(scored: DataFrame, t: DoubleType) -> DataFrame:
  return  scored.selectExpr(f"score >= {t} as above", "is_match").\
          groupBy("above").pivot("is_match", ("true", "false")).\
          count()

In [74]:
crossTabs(scored, 4.0).show()



+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+



                                                                                

In [75]:
crossTabs(scored, 3.5).show()

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20889|   6671|
|false|   42|5721530|
+-----+-----+-------+

