 #  RDD API Examples

## Word Count
In this example, we use a few transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.
```
sc.textFile(name, minPartitions=None, use_unicode=True)
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
```


In [8]:
import os

#text_file = sc.textFile(os.getcwd()+"/../datasets/quijote.txt")
# To avoid copying a local file to all workers

lines = []
with open('../datasets/quijote.txt') as my_file:
    for line in my_file:
        lines.append(line)
text_file = sc.parallelize(lines)


counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts = counts.sortBy(lambda a: a[1], ascending=False)
#NOTE: sortBy is not as efficient as sortByKey since it involves keying by the values,
#sorting by the keys, and then grabbing the values 
counts.take(50)

#counts.saveAsTextFile(os.path.join("/notebooks/","quixote-counts.txt"))


[('que', 18240),
 ('de', 16633),
 ('y', 14708),
 ('la', 9245),
 ('a', 8856),
 ('en', 7400),
 ('el', 7309),
 ('\n', 5834),
 ('no', 5251),
 ('se', 4309),
 ('los', 4194),
 ('con', 3769),
 ('por', 3475),
 ('', 3361),
 ('lo', 3167),
 ('las', 3097),
 ('le', 3053),
 ('su', 2975),
 ('don', 2345),
 ('del', 2251),
 ('me', 2146),
 ('como', 2069),
 ('es', 1871),
 ('un', 1754),
 ('más', 1673),
 ('si', 1668),
 ('yo', 1614),
 ('al', 1534),
 ('mi', 1526),
 ('de\n', 1352),
 ('para', 1301),
 ('ni', 1257),
 ('una', 1192),
 ('que\n', 1189),
 ('y\n', 1186),
 ('y,', 1136),
 ('porque', 1126),
 ('tan', 1108),
 ('o', 1067),
 ('sin', 1044),
 ('que,', 995),
 ('él', 971),
 ('ha', 962),
 ('la\n', 955),
 ('ser', 927),
 ('Sancho', 908),
 ('sus', 904),
 ('había', 903),
 ('todo', 898),
 ('-dijo', 870)]

## Pi Estimation

Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.

In [9]:
import random

NUM_SAMPLES=12000000

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, NUM_SAMPLES)) \
             .filter(inside).count()
print ("Pi is roughly {}".format(4.0 * count / NUM_SAMPLES))

Pi is roughly 3.140752


# DataFrame API Examples

### Testing Conversion to/from Pandas with arrow

In [10]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()


In this example, we count al quijote lines mentioning Dulcinea.

In [11]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col


# Creates a DataFrame having a single column named "line"
df = text_file.map(lambda r: Row(r)).toDF(["line"])
dulcinea_lines = df.filter(col("line").like("%Dulcinea%"))
# Counts all the Dulcinea lines
print("There are {} lines with 'Dulcinea'".format(dulcinea_lines.count()))
# Counts lines mentioning Dulcinea and Quijote
print("There are {} lines with 'Dulcinea' and 'Quijote'".format(
    dulcinea_lines.filter(col("line").like("%Quijote%")).count()))
# Fetches the lines as an array of strings
dulcinea_lines.filter(col("line").like("%Quijote%")).collect()

There are 282 lines with 'Dulcinea'
There are 12 lines with 'Dulcinea' and 'Quijote'


[Row(line='aquella noche no durmió don Quijote, pensando en su señora Dulcinea, por\n'),
 Row(line='Quijote pedía, y sin preguntar quién Dulcinea fuese, le prometió que el\n'),
 Row(line='Carta de don Quijote a Dulcinea del Toboso\n'),
 Row(line='don Quijote que si, en nombrando a Dulcinea, no decía también del Toboso,\n'),
 Row(line='Don Quijote, que tales blasfemias oyó decir contra su señora Dulcinea, no\n'),
 Row(line='-Si no fue la que llevaste a la señora Dulcinea -replicó don Quijote-, yo\n'),
 Row(line='a don Quijote que qué nuevas tenía de la señora Dulcinea, y que si le había\n'),
 Row(line='-A eso puedo decir -respondió don Quijote- que Dulcinea es hija de sus\n'),
 Row(line='Dulcinea del Toboso, ni le llevó la carta del señor don Quijote, porque se\n'),
 Row(line='encantada Dulcinea; en don Quijote, por no poder asegurarse si era verdad o\n'),
 Row(line='que pinta a don Quijote ya desenamorado de Dulcinea del Toboso.\n'),
 Row(line='-No quiero saber más -dijo don Quijote-; 

### Exploring the superheroes dataset

In [15]:
from pyspark.sql.types import *

# To avoid copying a local file to all workers we create pandas dataframe at driver and convert to spark dataframe
# To Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
superhero_pdf = pd.read_csv("../datasets/superheroes_info.csv",index_col='Index')


# We explicitly set schema to avoid problems with mapping pandas NaN Strings to SparkDataframe
# If not set,  Spark will try to convert NaN to DoubleType wiht error -> Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
mapping = {'object': StringType, 'float64': FloatType}
superhero_df = spark.createDataFrame(superhero_pdf, schema= StructType( [StructField(name, mapping[dtype.name]()) for name,dtype in superhero_pdf.dtypes.iteritems() ]))

superhero_df.show(10)


+---------------+--------+--------+------+---------+------+------+------+--------+---------+---------+-------------+------+-----------+---------------+--------------------+
|           Name|Identity|  Status|Gender|Alignment|  Race|Height|Weight|EyeColor|HairColor|SkinColor|    Publisher|  Year|Appearances|FirstAppearance|      AdditionalData|
+---------------+--------+--------+------+---------+------+------+------+--------+---------+---------+-------------+------+-----------+---------------+--------------------+
|     Spider-Man|  Secret|  Living|  Male|     Good| Human| 178.0|  74.0|   Hazel|    Brown|      NaN|Marvel Comics|1962.0|     4043.0|     1962-08-01|        Peter Parker|
|     Spider-Man|  Secret|  Living|  Male|     Good| Human| 178.0|  77.0|     Red|    Brown|      NaN|Marvel Comics|1962.0|     4043.0|     1962-08-01|        Peter Parker|
|     Spider-Man|  Secret|  Living|  Male|     Good| Human| 157.0|  56.0|   Brown|    Black|      NaN|Marvel Comics|1962.0|     4043.0|

In [16]:
from pyspark.sql.functions import isnan, when, count, col
df=superhero_df 
publisher_df = superhero_df.groupby("Publisher").count().show()

+-----------------+-----+
|        Publisher|count|
+-----------------+-----+
|        Rebellion|    1|
|               DC| 6808|
|    HarperCollins|    7|
| J. R. R. Tolkien|    1|
|        Star Trek|    6|
|    Marvel Comics|  482|
|        Wildstorm|    3|
|       South Park|    1|
|    Sony Pictures|    2|
|      Titan Books|    1|
|      ABC Studios|    4|
|             SyFy|    5|
|     Image Comics|   15|
|Universal Studios|    1|
|   IDW Publishing|    4|
|           Marvel|16109|
|     NBC - Heroes|   19|
|              NaN|   13|
|    Hanna-Barbera|    1|
|        DC Comics|  241|
+-----------------+-----+
only showing top 20 rows



### Spark SQL Example

In [7]:
superhero_df.createOrReplaceTempView("superhero_table")
spark.sql("select Name,Gender,Status from superhero_table").show()

+---------------+------+--------+
|           Name|Gender|  Status|
+---------------+------+--------+
|     Spider-Man|  Male|  Living|
|     Spider-Man|  Male|  Living|
|     Spider-Man|  Male|  Living|
|Captain America|  Male|  Living|
|Captain America|  Male|  Living|
|Captain America|  Male|  Living|
|      Wolverine|  Male|  Living|
|      Wolverine|  Male|  Living|
|       Iron Man|  Male|  Living|
|       Iron Man|  Male|Deceased|
|       Iron Man|  Male|  Living|
|           Thor|  Male|  Living|
|           Thor|  Male|  Living|
|           Thor|  Male|  Living|
|           Thor|  Male|  Living|
| Benjamin Grimm|  Male|  Living|
| Benjamin Grimm|  Male|  Living|
| Benjamin Grimm|  Male|Deceased|
| Benjamin Grimm|  Male|  Living|
| Benjamin Grimm|  Male|  Living|
+---------------+------+--------+
only showing top 20 rows

