In [1]:
import pyspark
import pyspark.sql

In [2]:
conf = pyspark.SparkConf().setAppName('JugandoASerBroker').setMaster('local[*]') #Creamos la configuración
sc = pyspark.SparkContext(conf = conf) #Abrimos el contexto de Spark
sparkSession = pyspark.sql.SparkSession(sc, jsparkSession=None)
spark = sparkSession.builder.master("local").appName("holi")\
                    .config("spark.some.config.option", "some-value").getOrCreate()

In [3]:
prices = spark.read.format("parquet").load("./data/master/prices", header = True)

In [4]:
prices.printSchema()

root
 |-- date: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- open: string (nullable = true)
 |-- close: string (nullable = true)
 |-- low: string (nullable = true)
 |-- high: string (nullable = true)
 |-- volume: string (nullable = true)



In [5]:
companies = prices.select(prices['symbol']).distinct().toPandas()
companies['symbol'].values

array(['SPGI', 'ALXN', 'GIS', 'K', 'LEN', 'AIV', 'AVY', 'MMM', 'PKI',
       'PPG', 'RF', 'WEC', 'AXP', 'CI', 'IRM', 'PFG', 'PM', 'SNA', 'OXY',
       'BLK', 'EA', 'ESRX', 'ULTA', 'DUK', 'MTD', 'TROW', 'HAS', 'FLIR',
       'MAT', 'XL', 'EMN', 'KIM', 'EVHC', 'PNW', 'PRU', 'DGX', 'AAPL',
       'TSN', 'ALL', 'CLX', 'CSCO', 'XOM', 'BMY', 'CMG', 'FRT', 'MCHP',
       'YHOO', 'ADI', 'PHM', 'VNO', 'F', 'KORS', 'FTV', 'ROP', 'SYY',
       'TEL', 'JNPR', 'RTN', 'AGN', 'CHRW', 'EQR', 'SPG', 'NUE', 'ZBH',
       'FFIV', 'GILD', 'HOLX', 'NLSN', 'OKE', 'QCOM', 'AMGN', 'CCL',
       'NOV', 'PNC', 'ANTM', 'MNST', 'NTRS', 'SCG', 'JPM', 'CINF', 'EOG',
       'PVH', 'WFC', 'FTI', 'SYK', 'HSY', 'TWX', 'HAR', 'PRGO', 'DIS',
       'HAL', 'MPC', 'MNK', 'CMS', 'DHR', 'GGP', 'BSX', 'KSU', 'MUR',
       'NEM', 'T', 'UHS', 'XRX', 'ACN', 'COP', 'EXPE', 'LYB', 'MRK', 'SO',
       'UDR', 'WU', 'AIG', 'INTC', 'LEG', 'NEE', 'AFL', 'EMR', 'PG',
       'APH', 'DVN', 'HP', 'ROST', 'WY', 'WYN', 'EQIX', 'EXR', 'INTU',

In [6]:
google = prices.filter(prices['symbol'] == "GOOGL")
google.show(3)

+----------+------+----------+----------+----------+----------+---------+
|      date|symbol|      open|     close|       low|      high|   volume|
+----------+------+----------+----------+----------+----------+---------+
|2011-01-03| GOOGL|596.480018|604.350009|596.480018|605.589979|4725600.0|
|2011-01-04| GOOGL|605.619978|602.120002| 600.11999|606.180025|3645300.0|
|2011-01-05| GOOGL|600.069991|609.070016|600.049992|610.329985|5059500.0|
+----------+------+----------+----------+----------+----------+---------+
only showing top 3 rows



In [7]:
import pyspark.sql.functions as func

In [8]:
googleVOL = google.select(func.to_date(google['date']).alias("fecha"),
                       google['volume'])
googleVOL.show(2)

+----------+---------+
|     fecha|   volume|
+----------+---------+
|2011-01-03|4725600.0|
|2011-01-04|3645300.0|
+----------+---------+
only showing top 2 rows



In [9]:
googleVOL.select(googleVOL['fecha']).first()

Row(fecha=datetime.date(2011, 1, 3))

In [10]:
import datetime
date_from = datetime.datetime(2015, 9, 30)
date_to = datetime.datetime(2015, 11, 1)

In [11]:
googOCT15 = googleVOL.filter((googleVOL['fecha'] > date_from)).filter((googleVOL['fecha'] < date_to))
googOCT15.show(20)

+----------+---------+
|     fecha|   volume|
+----------+---------+
|2015-10-01|2125300.0|
|2015-10-02|2439400.0|
|2015-10-05|1898000.0|
|2015-10-06|2138700.0|
|2015-10-07|2217700.0|
|2015-10-08|1965400.0|
|2015-10-09|1606200.0|
|2015-10-12|1334200.0|
|2015-10-13|2122300.0|
|2015-10-14|1564600.0|
|2015-10-15|2357800.0|
|2015-10-16|1815800.0|
|2015-10-19|1508600.0|
|2015-10-20|2613300.0|
|2015-10-21|1597600.0|
|2015-10-22|4431600.0|
|2015-10-23|6336300.0|
|2015-10-26|2438800.0|
|2015-10-27|1938900.0|
|2015-10-28|1980200.0|
+----------+---------+
only showing top 20 rows



In [12]:
prices.createOrReplaceTempView("prices")

In [13]:
stock = spark.sql("SELECT date, close FROM prices WHERE symbol LIKE 'AAL' LIMIT 30")
stock.show(10)

+----------+-----+
|      date|close|
+----------+-----+
|2011-01-03|10.65|
|2011-01-04|10.62|
|2011-01-05| 11.1|
|2011-01-06|11.24|
|2011-01-07| 11.4|
|2011-01-10|11.47|
|2011-01-11|10.97|
|2011-01-12|10.88|
|2011-01-13|10.89|
|2011-01-14|10.79|
+----------+-----+
only showing top 10 rows



In [14]:
sc.stop()