In [1]:
import pyspark

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [5]:
spark.read.json
spark.read.parquet

<bound method DataFrameReader.parquet of <pyspark.sql.readwriter.DataFrameReader object at 0x11f36fe80>>

In [3]:
spark.read.csv('./sa311/source.csv')

DataFrame[_c0: string, _c1: string]

In [7]:
spark.read.format('csv').load('./sa311/source.csv').show(5)

+---------+----------------+
|      _c0|             _c1|
+---------+----------------+
|source_id| source_username|
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
+---------+----------------+
only showing top 5 rows



In [10]:
(spark.read
 .option('header', True)
 .option('inferSchema', True)
 .format('csv')
 .load('./sa311/source.csv'))

DataFrame[source_id: string, source_username: string]

In [12]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("source_id", StringType()),
    StructField("source_username", StringType())
])

schema

StructType(List(StructField(source_id,StringType,true),StructField(source_username,StringType,true)))

In [15]:
df = (spark.read
 .option('schema', schema)
 .option('header', True)
 .csv('./sa311/source.csv'))

In [20]:
# in one notebook
df.write.csv('sa311_data_part1')
# read it into another notebook
spark.read.csv('sa311_data_part1').show()

In [21]:
df.createOrReplaceTempView('sources')

In [22]:
spark.sql('SELECT * FROM sources')

DataFrame[source_id: string, source_username: string]

In [26]:
df = spark.read.csv('./sa311/case.csv', header=True).select('case_late', 'num_days_late')

In [28]:
df.show()

+---------+-------------------+
|case_late|      num_days_late|
+---------+-------------------+
|       NO| -998.5087616000001|
|       NO|-2.0126041669999997|
|       NO|       -3.022337963|
|       NO|       -15.01148148|
|      YES|0.37216435200000003|
|       NO|       -29.74398148|
|       NO|       -14.70673611|
|       NO|       -14.70662037|
|       NO|       -14.70662037|
|       NO|       -14.70649306|
|       NO|       -14.70649306|
|       NO|       -14.70636574|
|       NO|          -14.70625|
|       NO|       -14.70636574|
|       NO|       -14.70623843|
|       NO|-14.705891199999998|
|       NO|       -14.70600694|
|       NO|       -14.70576389|
|       NO|       -14.70576389|
|       NO|       -14.70564815|
+---------+-------------------+
only showing top 20 rows



In [35]:
shape = (df.count(), len(df.columns))
print(shape)

(841704, 2)


In [42]:
df.select(df.num_days_late.cast('float')).describe().show()

+-------+------------------+
|summary|     num_days_late|
+-------+------------------+
|  count|            841671|
|   mean|-49.07486758743872|
| stddev| 176.5300249959313|
|    min|        -1417.0006|
|    max|         519.69806|
+-------+------------------+



In [45]:
df.select(df.case_late).distinct().show()

+---------+
|case_late|
+---------+
|      YES|
|       NO|
+---------+



In [46]:
df.createOrReplaceTempView('cases')

In [48]:
spark.sql('''
SELECT case_late, COUNT(*)
FROM cases
GROUP BY case_late
''').show()

+---------+--------+
|case_late|count(1)|
+---------+--------+
|      YES|   94503|
|       NO|  747201|
+---------+--------+



In [52]:
df.groupBy(df.case_late).count().show()

+---------+------+
|case_late| count|
+---------+------+
|      YES| 94503|
|       NO|747201|
+---------+------+



In [62]:
from pyspark.sql.functions import col, expr, count

df.groupBy(col('case_late')).agg(expr('count(*) AS n_cases')).show()

+---------+-------+
|case_late|n_cases|
+---------+-------+
|      YES|  94503|
|       NO| 747201|
+---------+-------+



In [61]:
df.groupBy(df.case_late).agg(count(df.case_late).alias('n_cases')).show()

+---------+-------+
|case_late|n_cases|
+---------+-------+
|      YES|  94503|
|       NO| 747201|
+---------+-------+

