In [1]:
spark

In [2]:
df = spark.read.csv("gs://hpl-bucket1/data/yellow_tripdata_2018-10.csv", header=True, inferSchema=True)

                                                                                

In [3]:
df.printSchema()
df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+-

In [4]:
# select the info we need in q2
q2_info = df.select('passenger_count', 'payment_type', 'total_amount')

In [5]:
q2_info.show(5)
q2_info.count()

+---------------+------------+------------+
|passenger_count|payment_type|total_amount|
+---------------+------------+------------+
|              1|           2|        21.8|
|              1|           2|        36.3|
|              1|           1|       25.35|
|              1|           2|         8.3|
|              1|           1|       14.75|
+---------------+------------+------------+
only showing top 5 rows



                                                                                

8821105

In [6]:
q2_info.describe().show()



+-------+------------------+-------------------+------------------+
|summary|   passenger_count|       payment_type|      total_amount|
+-------+------------------+-------------------+------------------+
|  count|           8821105|            8821105|           8821105|
|   mean| 1.569944014950508| 1.3024741231399013| 16.96397637618119|
| stddev|1.2188878126211988|0.47957818043819983|136.66237916338224|
|    min|                 0|                  1|            -475.3|
|    max|                 9|                  5|         403408.18|
+-------+------------------+-------------------+------------------+



                                                                                

In [7]:
# get the cases of the payment type is either credit card or cash
target_payment = q2_info.filter((q2_info['payment_type'] > 0) & (q2_info['payment_type'] < 3))
target_payment.count()

                                                                                

8763628

In [8]:
# get the range of passenge count we need
target_passenger = target_payment.filter((q2_info['passenger_count'] > 0) & (q2_info['passenger_count'] < 5))
target_passenger.count()

                                                                                

8060128

In [9]:
import pyspark.sql.functions as F

In [10]:
q2_result = target_passenger.groupBy('passenger_count').agg(F.count('passenger_count'),F.mean('total_amount'),F.max('total_amount'))

In [11]:
q2_result.orderBy('passenger_count').show()



+---------------+----------------------+------------------+-----------------+
|passenger_count|count(passenger_count)| avg(total_amount)|max(total_amount)|
+---------------+----------------------+------------------+-----------------+
|              1|               6256861|16.850306142476434|        403408.18|
|              2|               1276519|17.559149350667102|           2243.7|
|              3|                364502| 17.25161049871787|           873.32|
|              4|                162246| 17.51855731419773|            490.8|
+---------------+----------------------+------------------+-----------------+



                                                                                

In [28]:
txt_file = spark.read.text("gs://hpl-bucket1/data/Youvegottofindwhatyoulove.txt")

In [29]:
txt_file.show()

+--------------------+
|               value|
+--------------------+
|I am honored to b...|
|                    |
|I dropped out of ...|
|                    |
|It started before...|
|                    |
|And 17 years late...|
|                    |
|It wasn’t all rom...|
|                    |
|Reed College at t...|
|                    |
|None of this had ...|
|                    |
|Again, you can’t ...|
|                    |
|My second story i...|
|                    |
|I was lucky — I f...|
|                    |
+--------------------+
only showing top 20 rows



In [53]:
rows = txt_file.collect()
for i in range(len(rows)):
    print(type(rows[i].__getitem__('value')))

<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>


In [55]:
# remove punctuation
from pyspark.sql.functions import regexp_replace, trim, col, lower

In [56]:
def removePunctuation(column):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  

    Args:
        column (Column): A Column containing a sentence.

    Returns:
        Column: A Column named 'sentence' with clean-up operations applied.
    """
    return lower(trim(regexp_replace(column,'\\p{Punct}',''))).alias('sentence')

In [57]:
txt_df = txt_file.select(removePunctuation(col('value')))
txt_df.show(5)

+--------------------+
|            sentence|
+--------------------+
|i am honored to b...|
|                    |
|i dropped out of ...|
|                    |
|it started before...|
+--------------------+
only showing top 5 rows



In [58]:
# words from lines
from pyspark.sql.functions import split, explode

In [59]:
txt_df = txt_df.select(explode(split(txt_df.sentence, '[\s]+')).alias('word')).where("word!=''")
txt_df.show(5)

+-------+
|   word|
+-------+
|      i|
|     am|
|honored|
|     to|
|     be|
+-------+
only showing top 5 rows



In [60]:
def wordCount(wordListDF):
        """Creates a DataFrame with word counts.

        Args:
            wordListDF (str): A DataFrame consisting of one string column called 'word'.

        Returns:
            DataFrame of (str, int): A DataFrame containing 'word' and 'count' columns.
        """
        return wordListDF.groupBy('word').count()

In [61]:
topWordsAndCountsDF = wordCount(txt_df).orderBy(['count'],ascending=False)
topWordsAndCountsDF.show(30)

+-------+-----+
|   word|count|
+-------+-----+
|    the|   96|
|      i|   86|
|     to|   71|
|    and|   67|
|     it|   53|
|    was|   48|
|      a|   46|
|     of|   41|
|   that|   38|
|     in|   34|
|    you|   31|
|     my|   30|
|     is|   28|
|    had|   22|
|    out|   20|
|   with|   19|
|     me|   18|
|   have|   17|
|    for|   17|
|     so|   17|
|   life|   16|
|   your|   16|
|    all|   16|
|     on|   15|
|     as|   15|
|   what|   15|
|college|   14|
|     be|   14|
|    but|   14|
|   from|   13|
+-------+-----+
only showing top 30 rows

