# Investor - Flow of Funds - US

### Introduction:

Special thanks to: https://github.com/rgrp for sharing the dataset.

### Step 1. Import the necessary libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('funds').getOrCreate()
spark

### Step 2. Import the dataset from this [address](https://raw.githubusercontent.com/datasets/investor-flow-of-funds-us/master/data/weekly.csv). 

In [2]:
from pyspark import SparkFiles

### Step 3. Assign it to a variable called 

In [3]:
url = "https://raw.githubusercontent.com/datasets/investor-flow-of-funds-us/master/data/weekly.csv"

spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("weekly.csv"), header=True, inferSchema=True, sep=',')

In [4]:
df.show(5)

+----------+------------+---------------+------------+------+----------+------------+--------------+-----+
|      Date|Total Equity|Domestic Equity|World Equity|Hybrid|Total Bond|Taxable Bond|Municipal Bond|Total|
+----------+------------+---------------+------------+------+----------+------------+--------------+-----+
|2012-12-05|       -7426|          -6060|       -1367|   -74|      5317|        4210|          1107|-2183|
|2012-12-12|       -8783|          -7520|       -1263|   123|      1818|        1598|           219|-6842|
|2012-12-19|       -5496|          -5470|         -26|   -73|       103|        3472|         -3369|-5466|
|2012-12-26|       -4451|          -4076|        -375|   550|      2610|        3333|          -722|-1291|
|2013-01-02|      -11156|          -9622|       -1533|  -158|      2383|        2103|           280|-8931|
+----------+------------+---------------+------------+------+----------+------------+--------------+-----+
only showing top 5 rows



### Step 4.  What is the frequency of the dataset?

In [5]:
#Weeks

### Step 5. Set the column Date as the index.

### Step 6. What is the type of the index?

In [9]:
df.dtypes

[('Date', 'string'),
 ('Total Equity', 'int'),
 ('Domestic Equity', 'int'),
 ('World Equity', 'int'),
 ('Hybrid', 'int'),
 ('Total Bond', 'int'),
 ('Taxable Bond', 'int'),
 ('Municipal Bond', 'int'),
 ('Total', 'int')]

### Step 7. Set the index to a DatetimeIndex type

In [10]:
import pyspark.sql.types as T

In [12]:
df = df.withColumn('Date', df.Date.cast(T.DateType()))
df.show(5)

+----------+------------+---------------+------------+------+----------+------------+--------------+-----+
|      Date|Total Equity|Domestic Equity|World Equity|Hybrid|Total Bond|Taxable Bond|Municipal Bond|Total|
+----------+------------+---------------+------------+------+----------+------------+--------------+-----+
|2012-12-05|       -7426|          -6060|       -1367|   -74|      5317|        4210|          1107|-2183|
|2012-12-12|       -8783|          -7520|       -1263|   123|      1818|        1598|           219|-6842|
|2012-12-19|       -5496|          -5470|         -26|   -73|       103|        3472|         -3369|-5466|
|2012-12-26|       -4451|          -4076|        -375|   550|      2610|        3333|          -722|-1291|
|2013-01-02|      -11156|          -9622|       -1533|  -158|      2383|        2103|           280|-8931|
+----------+------------+---------------+------------+------+----------+------------+--------------+-----+
only showing top 5 rows



In [13]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Total Equity: integer (nullable = true)
 |-- Domestic Equity: integer (nullable = true)
 |-- World Equity: integer (nullable = true)
 |-- Hybrid: integer (nullable = true)
 |-- Total Bond: integer (nullable = true)
 |-- Taxable Bond: integer (nullable = true)
 |-- Municipal Bond: integer (nullable = true)
 |-- Total: integer (nullable = true)



### Step 8.  Change the frequency to monthly, sum the values and assign it to monthly.

In [14]:
import pyspark.sql.functions as F

In [17]:
df_new = df.withColumn('month', F.month('Date'))\
            .withColumn('year', F.year('Date'))
df_new.show(5)

+----------+------------+---------------+------------+------+----------+------------+--------------+-----+-----+----+
|      Date|Total Equity|Domestic Equity|World Equity|Hybrid|Total Bond|Taxable Bond|Municipal Bond|Total|month|year|
+----------+------------+---------------+------------+------+----------+------------+--------------+-----+-----+----+
|2012-12-05|       -7426|          -6060|       -1367|   -74|      5317|        4210|          1107|-2183|   12|2012|
|2012-12-12|       -8783|          -7520|       -1263|   123|      1818|        1598|           219|-6842|   12|2012|
|2012-12-19|       -5496|          -5470|         -26|   -73|       103|        3472|         -3369|-5466|   12|2012|
|2012-12-26|       -4451|          -4076|        -375|   550|      2610|        3333|          -722|-1291|   12|2012|
|2013-01-02|      -11156|          -9622|       -1533|  -158|      2383|        2103|           280|-8931|    1|2013|
+----------+------------+---------------+------------+--

In [22]:
cols = ['year','month']
df_month_sampled = df_new.withColumn('Yr_Mo', F.concat_ws("-",*cols)).drop("Date","year","month")
df_month_sampled.show(5)

+------------+---------------+------------+------+----------+------------+--------------+-----+-------+
|Total Equity|Domestic Equity|World Equity|Hybrid|Total Bond|Taxable Bond|Municipal Bond|Total|  Yr_Mo|
+------------+---------------+------------+------+----------+------------+--------------+-----+-------+
|       -7426|          -6060|       -1367|   -74|      5317|        4210|          1107|-2183|2012-12|
|       -8783|          -7520|       -1263|   123|      1818|        1598|           219|-6842|2012-12|
|       -5496|          -5470|         -26|   -73|       103|        3472|         -3369|-5466|2012-12|
|       -4451|          -4076|        -375|   550|      2610|        3333|          -722|-1291|2012-12|
|      -11156|          -9622|       -1533|  -158|      2383|        2103|           280|-8931| 2013-1|
+------------+---------------+------------+------+----------+------------+--------------+-----+-------+
only showing top 5 rows



In [25]:
monthly = df_month_sampled.groupBy("Yr_Mo").sum().orderBy("Yr_Mo")
monthly.show(5)

+-------+-----------------+--------------------+-----------------+-----------+---------------+-----------------+-------------------+----------+
|  Yr_Mo|sum(Total Equity)|sum(Domestic Equity)|sum(World Equity)|sum(Hybrid)|sum(Total Bond)|sum(Taxable Bond)|sum(Municipal Bond)|sum(Total)|
+-------+-----------------+--------------------+-----------------+-----------+---------------+-----------------+-------------------+----------+
|2012-12|           -26156|              -23126|            -3031|        526|           9848|            12613|              -2765|    -15782|
| 2013-1|             3661|               -1627|             5288|       2730|          12149|             9414|               2735|     18540|
|2014-11|            -2753|               -7239|             4485|        729|          14528|            11566|               2962|     12502|
| 2014-4|            10842|                1048|             9794|       4931|           8493|             7193|               1300|    

### Step 9. You will notice that it filled the dataFrame with months that don't have any data with NaN. Let's drop these rows.

In [26]:
monthly = df_month_sampled.dropna().groupBy("Yr_Mo").sum().orderBy("Yr_Mo")
monthly.show(5)

+-------+-----------------+--------------------+-----------------+-----------+---------------+-----------------+-------------------+----------+
|  Yr_Mo|sum(Total Equity)|sum(Domestic Equity)|sum(World Equity)|sum(Hybrid)|sum(Total Bond)|sum(Taxable Bond)|sum(Municipal Bond)|sum(Total)|
+-------+-----------------+--------------------+-----------------+-----------+---------------+-----------------+-------------------+----------+
|2012-12|           -26156|              -23126|            -3031|        526|           9848|            12613|              -2765|    -15782|
| 2013-1|             3661|               -1627|             5288|       2730|          12149|             9414|               2735|     18540|
|2014-11|            -2753|               -7239|             4485|        729|          14528|            11566|               2962|     12502|
| 2014-4|            10842|                1048|             9794|       4931|           8493|             7193|               1300|    

### Step 10. Good, now we have the monthly data. Now change the frequency to year.

In [27]:
df_new.columns

['Date',
 'Total Equity',
 'Domestic Equity',
 'World Equity',
 'Hybrid',
 'Total Bond',
 'Taxable Bond',
 'Municipal Bond',
 'Total',
 'month',
 'year']

In [29]:
cols = ['year','month']
df_year_sampled = df_new.drop("Date","month").dropna().groupBy('year').sum().orderBy('year')
df_year_sampled.show(5)

+----+-----------------+--------------------+-----------------+-----------+---------------+-----------------+-------------------+----------+---------+
|year|sum(Total Equity)|sum(Domestic Equity)|sum(World Equity)|sum(Hybrid)|sum(Total Bond)|sum(Taxable Bond)|sum(Municipal Bond)|sum(Total)|sum(year)|
+----+-----------------+--------------------+-----------------+-----------+---------------+-----------------+-------------------+----------+---------+
|2012|           -26156|              -23126|            -3031|        526|           9848|            12613|              -2765|    -15782|     8048|
|2013|             3661|               -1627|             5288|       2730|          12149|             9414|               2735|     18540|     4026|
|2014|              330|              -44689|            45021|      19570|          59890|            44994|              14896|     79787|    52364|
|2015|            15049|              -10459|            25508|       7280|          26028|   

### BONUS: Create your own question and answer it.