# Working with purchases using DataFrames

## Load data

In [1]:
rdd = sc.textFile('PURCHASES/purchases.txt')

## Convert to a DataFrame

In [3]:
from pyspark.sql import Row

def parse_row(line):
    """Converts a line into a Row
       If the line is a data line it is converted to a Row and returned as a list with that Row,
       otherwise an empty list is returned.
    """
    if len(line.split('\t'))==6:
        data, hora, cidade, tipo, custo, pago = line.split('\t')
        return [Row(data=data, hora=hora, cidade=cidade, tipo=tipo, custo=float(custo), pago=pago)]
    return []

Using flatMap we have the flexibility to return nothing from a call to the function, this is accomplished returning and empty array.

In [4]:
data = rdd.flatMap(parse_row).toDF()

In [65]:
# Imprime o esquema do Dataframe

In [5]:
data.printSchema()

root
 |-- cidade: string (nullable = true)
 |-- custo: double (nullable = true)
 |-- data: string (nullable = true)
 |-- hora: string (nullable = true)
 |-- pago: string (nullable = true)
 |-- tipo: string (nullable = true)



In [66]:
# Mostra algunhas liñas do Dataframe

In [6]:
data.show(4)

+----------+------+----------+-----+--------+----------------+
|    cidade| custo|      data| hora|    pago|            tipo|
+----------+------+----------+-----+--------+----------------+
|  San Jose|214.05|2012-01-01|09:00|    Amex|  Men's Clothing|
|Fort Worth|153.57|2012-01-01|09:00|    Visa|Women's Clothing|
| San Diego| 66.08|2012-01-01|09:00|    Cash|           Music|
|Pittsburgh|493.51|2012-01-01|09:00|Discover|    Pet Supplies|
+----------+------+----------+-----+--------+----------------+
only showing top 4 rows



## Count the number of points

In [19]:
data.count() # Datos totales del dataset

4138476

## Count the number of points per city

In [25]:
data.groupby('cidade').count().show() # cuantas veces aparece cada ciudad

+---------------+-----+
|         cidade|count|
+---------------+-----+
|North Las Vegas|40013|
|        Phoenix|40333|
|          Omaha|40209|
|      Anchorage|39806|
|        Anaheim|40086|
|     Greensboro|40232|
|         Dallas|40368|
|         Laredo|40342|
|        Oakland|39728|
|     Scottsdale|40173|
|    San Antonio|40197|
|    Bakersfield|40326|
|    Chula Vista|40080|
|        Raleigh|40261|
|     Louisville|40099|
|   Philadelphia|40748|
|    Los Angeles|40254|
|       Chandler|39826|
|     Sacramento|40561|
|   Indianapolis|40321|
+---------------+-----+
only showing top 20 rows



## Top 10 cities with more purchases (lines)

In [30]:
from pyspark.sql.functions import desc, count
data.groupby('cidade').agg(count('*').alias('count')).orderBy(desc('count')).show(10)


+-------------+-----+
|       cidade|count|
+-------------+-----+
| Philadelphia|40748|
|       Newark|40577|
|   Sacramento|40561|
|    Charlotte|40509|
|   Washington|40503|
|       Durham|40501|
|    Rochester|40455|
|   Cincinnati|40452|
|Oklahoma City|40446|
|   Fort Wayne|40439|
+-------------+-----+
only showing top 10 rows



## Filter Music data

In [41]:
music = data.filter(data.tipo == "Music")
music.show(10)

+-------------+------+----------+-----+----------+-----+
|       cidade| custo|      data| hora|      pago| tipo|
+-------------+------+----------+-----+----------+-----+
|    San Diego| 66.08|2012-01-01|09:00|      Cash|Music|
|San Francisco|260.65|2012-01-01|09:00|  Discover|Music|
|    Anchorage|298.86|2012-01-01|09:01|MasterCard|Music|
|   Pittsburgh| 46.99|2012-01-01|09:03|  Discover|Music|
|  Jersey City| 11.29|2012-01-01|09:04|      Cash|Music|
|      Houston|461.11|2012-01-01|09:05|  Discover|Music|
|   Scottsdale| 461.2|2012-01-01|09:06|      Amex|Music|
|  Kansas City|364.24|2012-01-01|09:06|      Amex|Music|
|      Houston|331.68|2012-01-01|09:06|MasterCard|Music|
|      Phoenix|  94.3|2012-01-01|09:08|      Visa|Music|
+-------------+------+----------+-----+----------+-----+
only showing top 10 rows



## Find the total purchases cost per day of Music in San Diego

In [42]:
from pyspark.sql.functions import sum

In [43]:
sandiego_music = music.filter(music.cidade == "San Diego")

cost = sandiego_music.select(sum('custo')).collect()[0][0]

print("Valor de compras en San Diego:", (cost))

('Valor de compras en San Diego:', 553940.6799999995)
