# pyspark basics: loading, exploring and saving data

# The Spark Context
An initial note about how pySpark interacts with Jupyter notebooks: two global variables are created in the startup process. The first is the spark context:
They provide access to many of the underlying structures used by pySpark, and you may see them referred to in code throughout the tutorials alongside functions imported from pyspark.sql.

In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 20 kB/s  eta 0:00:01   |█                               | 6.2 MB 309 kB/s eta 0:10:41     |████▊                           | 30.2 MB 124 kB/s eta 0:23:22     |███████████████████▍            | 124.0 MB 155 kB/s eta 0:08:36     |███████████████████████▎        | 148.8 MB 137 kB/s eta 0:06:44
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 539 kB/s eta 0:00:01
[?25hUsing legacy 'setup.py install' for pyspark, since package 'wheel' is not installed.
Installing collected packages: py4j, pyspark
    Running setup.py install for pyspark ... [?25ldone
[?25hSuccessfully installed py4j-0.10.9 pyspark-3.0.1
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession # SparkSession is all
from pyspark.sql.types import DateType, TimestampType, IntegerType, FloatType, LongType, DoubleType
from pyspark.sql.types import StructType, StructField

In [3]:
# Initiate a spark context
sc = SparkContext()

In [4]:
sc

In [5]:
# Initiat a spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
spark

# Loading csv
使用 inferSchema 虽然能够自己推断数据类型，但是推断类型需要耗费时间。特别是在大的数据集上面。但是自己指定数据类型又容易出错。所以可以在第一次加载数据的时候使用 inferSchema，然后记下所有列的数据类型，以供下次更加方便的使用。

In [9]:
session = SparkSession.builder.appName("santander").master("local[*]").getOrCreate()
df = session.read.csv("../pyspark-learning/boston_house_prices.csv", header=True, inferSchema=True, sep=",")


In [None]:
"""
手動調整schema dtyps
custom_schema = StructType([StructField('_c0', DateType(), True),
                           StructField('_c1', StringType(), True),
                           StructField('_c2', DoubleType(), True),
                           StructField('_c3', DoubleType(), True),
                           StructField('_c4', DoubleType(), True),
                           StructField('_c5', IntegerType(), True),
                           ...
                           StructField('_c27', StringType(), True)])
                          
df = spark.read.csv("../nput/train.csv", header=False, schema=custom_schema, sep=',')
"""

One example of using `infering` and `specifying a schema` together might be with a large, unfamiliar dataset that you know you will need to load up and work with repeatedly. The first time you load it use `inferSchema`, then make note of the dtypes it assigns. Use that information to build the custom schema, so that when you load the data in the future you avoid the extra processing time necessary for infering.



# 常見API

# Exploring Data

In [10]:
# count numbers of rows
df.count()

507

In [12]:
# column-by-column dtypes
df.dtypes

[('506', 'string'),
 ('13', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string'),
 ('_c5', 'string'),
 ('_c6', 'string'),
 ('_c7', 'string'),
 ('_c8', 'string'),
 ('_c9', 'string'),
 ('_c10', 'string'),
 ('_c11', 'string'),
 ('_c12', 'string'),
 ('_c13', 'string')]

For each pairing (a tuple object in Python, denoted by the parentheses), 
the first entry is the column name and the second is the dtype. 
Notice that this data has no headers with it (we specified headers=False when we loaded it), 
so Spark used its default naming convention of _c0, _c1, ... _cn. We'll makes some changes to that in a minute.

In [14]:
# take a peek at the first few rows
df.take(5)

[Row(506='CRIM', 13='ZN', _c2='INDUS', _c3='CHAS', _c4='NOX', _c5='RM', _c6='AGE', _c7='DIS', _c8='RAD', _c9='TAX', _c10='PTRATIO', _c11='B', _c12='LSTAT', _c13='MEDV'),
 Row(506='0.00632', 13='18', _c2='2.31', _c3='0', _c4='0.538', _c5='6.575', _c6='65.2', _c7='4.09', _c8='1', _c9='296', _c10='15.3', _c11='396.9', _c12='4.98', _c13='24'),
 Row(506='0.02731', 13='0', _c2='7.07', _c3='0', _c4='0.469', _c5='6.421', _c6='78.9', _c7='4.9671', _c8='2', _c9='242', _c10='17.8', _c11='396.9', _c12='9.14', _c13='21.6'),
 Row(506='0.02729', 13='0', _c2='7.07', _c3='0', _c4='0.469', _c5='7.185', _c6='61.1', _c7='4.9671', _c8='2', _c9='242', _c10='17.8', _c11='392.83', _c12='4.03', _c13='34.7'),
 Row(506='0.03237', 13='0', _c2='2.18', _c3='0', _c4='0.458', _c5='6.998', _c6='45.8', _c7='6.0622', _c8='3', _c9='222', _c10='18.7', _c11='394.63', _c12='2.94', _c13='33.4')]

In the format `column_name=value` for each row. Note that the formatting above is ugly because take doesn't try to make it pretty, it just returns the row object itself. We can use show instead and that attempts to format the data better, but because there are so many columns in this case the formatting of show doesn't fit, and each line wraps down to the next. We'll use show on a subset below.

In [15]:
df.show()

+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|    506|  13|  _c2| _c3|  _c4|  _c5| _c6|   _c7|_c8|_c9|   _c10|  _c11| _c12|_c13|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|TAX|PTRATIO|     B|LSTAT|MEDV|
|0.00632|  18| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|  24|
|0.02731|   0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729|   0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237|   0| 2.18|   0|0.458|6.998|45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905|   0| 2.18|   0|0.458|7.147|54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985|   0| 2.18|   0|0.458| 6.43|58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012|66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172|96.1|5.9505|  5|311|   15.2| 396.9|19.1

# Selecting and Renaming Columns

Selecting is for selecting a subpart of the columns

In [16]:
df_lim = df.select('_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13')

Renaming, you can rename one column at a time, or you can change column names with list or dictionary at a time

In [17]:
old_names = ['_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13']
new_names = ['servicer_name', 'new_int_rt', 'act_endg_upb', 'loan_age', 'mths_remng', 'aj_mths_remng', 'dt_matr', 'cd_msa', 'delq_sts', 'flag_mod', 'cd_zero_bal', 'dt_zero_bal']
for old, new in zip(old_names, new_names):
    df_lim = df_lim.withColumnRenamed(old, new)

In [18]:
df_lim.columns

['servicer_name',
 'new_int_rt',
 'act_endg_upb',
 'loan_age',
 'mths_remng',
 'aj_mths_remng',
 'dt_matr',
 'cd_msa',
 'delq_sts',
 'flag_mod',
 'cd_zero_bal',
 'dt_zero_bal']

# Describe
Now we'll describe the data. Note that describe returns a new dataframe with the information, and so must have show called after it if our goal is to view it (note the nice formatting in this case). This can be called on one or more specific columns, as we do here, or the entire dataframe by passing no columns to describe:

In [19]:
df.describe('_c2', '_c3')

DataFrame[summary: string, _c2: string, _c3: string]

In [21]:
df.describe('_c2', '_c3').show()

+-------+------------------+------------------+
|summary|               _c2|               _c3|
+-------+------------------+------------------+
|  count|               507|               507|
|   mean|11.136778656126504|0.0691699604743083|
| stddev| 6.860352940897589|0.2539940413404101|
|    min|              0.46|                 0|
|    max|             INDUS|              CHAS|
+-------+------------------+------------------+



## 根据某列的字段进行分组,然后做一些计算
df_grp = new_df.groupBy('label')
label_avg = df_grp.avg("label")
label_avg.show()

However, if you actually ran the code, you probably noticed that the the code block finished nearly instantly - despite there being over 3.5 million rows of data. This is an example of lazy computing - **nothing was actually computed here**. At the moment, we're just creating a list of instructions. All pySpark did was make sure they were valid instructions. Now let's see what happens if we tell it to show us the results:

That takes a bit longer to run, because when you executed show you asked for a dataframe to be returned to you, which meant **Spark went back and caclulated the three previous operations**. You could have done any number of intermediate steps similar to those before calling show and they all would have been **lazy operations** that finished nearly instantly, until show ran them all.

Now this would just be a background peculiarity, except that we have some control over the process. If you imagine your lineage as a **straight line of instructions leading from your source data to your ouput**, we can use the **persist() method to create a point for branching**. Essentially it tells Spark "follow the instructions to this point, then hold these results because I'm going to come back to them again."

Let's redo the previous code block with a persist():

# Persist and Cache

`.persist()` 在之後是指，我到這一步之前，我把這之前做的operation存進persistent，這樣你下次就不用再跑這一步之前的步驟

The trailing ; simply gags the output from the command. We don't need to see the summary of what we just unpersisted)

Also note that cache() is essentially a synonym for persist(), except it specifies storing the checkpoint in memory for the fastest recall, while persisting allows Spark to swap some of the checkpoint to disk if necessary. Obviously cache() only works if the dataframe you are forcing it to hold is small enough that it can fit in the memory of each node, so use it with care.

And finally, a bit more on groupBy. Hopefully the usage above has given you some insight into how it works. In short, groupBy is the vehicle for aggregation in a dataframe. **A groupBy object is, in itself, incomplete. So, the line in the code block where we introduced a persist() above that looks like this:**

**(sub dataframe for later operation is often used for .persist()**

df_grp = perf_keep.groupBy('_c2')

which generates a groupBy object where the data is grouped around the unique values found in column C2, but it is just a foundation. It is like the sentence "We are going to group our data up by the unique values found in column C2, and then..." The next line of code contains the rest:

df_avg = df_grp.avg('_c3', '_c5', '_c6', '_c12', 'New_c12')

Or to finish the sentence, "... calculate the averages for these five columns within each group."

# 寫入數據

In [28]:
df.select('target').write.format('com.databricks.spark.csv').option("header", "true").save("../input/spark_write.csv")
# the selected dataframe to write and save
df_described.write.format('com.databricks.spark.csv').option("header", "true").save('s3://ui-spark-social-science-public/data/mycsv')


AnalysisException: cannot resolve '`target`' given input columns: [13, 506, _c10, _c11, _c12, _c13, _c2, _c3, _c4, _c5, _c6, _c7, _c8, _c9];;
'Project ['target]
+- Relation[506#16,13#17,_c2#18,_c3#19,_c4#20,_c5#21,_c6#22,_c7#23,_c8#24,_c9#25,_c10#26,_c11#27,_c12#28,_c13#29] csv
