# Lecture 1.4: Data Pipelines

This lecture, we are going to implement a small data pipeline in python. It will take raw text files from a "data warehouse", extract, transform, then load them back into a different dataset. Then, we will automate this pipeline by scheduling it with cron.

**Learning goals:**
- implement a simple data pipeline
- write python functions
- write a python script
- learn python File I/O
- schedule a cron task with crontab

## 1 Reading Files

We are in the middle of a global pandemic, and there's a lot of information traded online, especially with so many of us are confined in our homes. Twitter is a common place where data scientists gauge public opinion or group behaviour. In our data warehouse, we have ~ 100 tweets containing the `#corona` hashtag. We want to get some insights into the epidemic by analysing the text data. 

For this, we'll have to open files in python. But first, let's investigate what we have in our data warehouse. 

ℹ️ You can run bash commands in a jupyter notebook by preceding them with a a bang (`!`).

In [1]:
!ls data_warehouse

covid_tweets  tweets


One directory, `tweets`. Let's see what's inside:

In [2]:
!ls data_warehouse/tweets | head

0a63eb60b0ab4222ab6bb63bac648d8f.txt
0b0043cd1dbb413c95f5a6bf22a0683d.txt
13031cdfec3846b4849e0f2846e3c696.txt
1870e02eedb64721b3eccfde8245c8e8.txt
1d362230e83046668d13bc726159f499.txt
2196197adc4a4231a5b18425820de9a3.txt
2378c8aad6ce478f996c72c6c579220a.txt
23b10fcb7c6345e9ad8f2f0276e16d44.txt
24a31675e04c40d8a6d3e91c93d6b99a.txt
268fbf1747c049709e723537bce8c1f6.txt


That's our tweets! Now we want to read one of these tweets. We could use `cat` or a text editor, but let's try loading the data directly in python. 

💪 With the help of this [documentation](https://docs.python.org/3/tutorial/inputoutput.html), open the contents of `0a63eb60b0ab4222ab6bb63bac648d8f.txt` in python! Display the tweet in clear text in the cell output.

In [23]:
with open('data_warehouse/tweets/0a63eb60b0ab4222ab6bb63bac648d8f.txt') as f:
    read_data = f.read()
    
    print(read_data)

Mrs Sonia Gandhi : This decision to remain at home will restrict transmission of this virus. All of us must spread awareness that frequent hand washing, not touching face &amp; reporting all flu/ influenza like conditions to medical help line or  doctor needs to be adhered to #Corona https://t.co/x9zU18P4oV


🧠 Congrats! You should have used the `with` keyword. If not, please try again. Can you explain why the  `with` keyword is considered good practice?

💪 Now let's make a function to open a text file for a given path, it'll be much easier to use. Copy your working code from the cell above to complete this function. Your function should return a `string` of the text inside the file.

In [24]:
def read_file(path):
    # INSERT YOUR CODE HERE
    
    with open(path) as f:
        read_data = f.read()
        return read_data

Let's try it out. If the following code cell doesn't return the tweet content, go back and try again!

In [25]:
read_file('data_warehouse/tweets/c03af4427b6e45728a483d7a75cc8651.txt')

"couldn't wait to turn 21 Sit in the backyard sun and drink beer with dad Drinking beer with dad Out there on the back porch swinging Drinking beer with dad #Corona\n"

Oh no! It looks like some of the `#corona` tweets are about beer 🍺, not the virus 😷... This might become a problem if we try to analyse the data. But fear not, we can _clean_ the data before we use it for anything. In this case, we want to filter out all tweets that are about beer rather than viruses. There are complex ways of doing this, but we'll stick to a simple solution: if a tweet contains the word `beer`, we'll assume it's talking about the corona brand rather than the COVID-19 virus. 

Our aim here, is to build a **data pipeline** that will extract our tweets from the data warehouse, filter them, and load them again in a new `covid_tweets` directory.

## 2 Manipulating Text

In the previous section, we've extracted tweets from the data warehouse. Now we would like to filter out the ones about beer.

💪 Write a function that takes in text, and returns `True` if the text contains the word `beer`, `False` if it doesn't. Since we're catching the meaning of beer here, keep in mind that it should work for all letter cases!


In [26]:
def contains_beer(text):
    # INSERT CODE HERE
    
    word_filter = 'beer'
    case_filter = text.lower()
    
    if word_filter in case_filter:
        return True
    else:
        return False

Now we want to test this function. For this, we can use a **unit test**. Unit tests are functions whose sole purpose is to test the behaviours of other functions and classes. They are crucial in software development because they allow to catch bugs early. We'll cover unit testing in more detail later in the course, but you should remember that one of the main advantages of writing proper data pipelines instead of manually moving things around... is that we can write tests!

💪 The cell below defines and executes a unit test. Simply run the cell to test your `.contains_beer()` function. If the test fails, try fixing your function above and try again!

In [27]:
def test_contains_beer():
    assert not contains_beer('This sentence is about viruses')
    assert contains_beer('This sentence is about beer')
    assert contains_beer('ThIs SeNtEnCe Is AbOuT BeeR!')
    print('Success! 🎉')
    
test_contains_beer()

Success! 🎉


🧠 Can you explain what the [`assert`](https://stackoverflow.com/questions/5142418/what-is-the-use-of-assert-in-python) keyword does in this unit test?

## 3 Iterating Through Files

We can now read a tweet, and analyse if it speaks of viruses or beers. But our data pipeline must run on the entire dataset, so we have to iterate through all the files. We can use [glob](https://docs.python.org/3/library/glob.html), which allows us to search and list file paths.

The cell below lists the tweet file paths, i.e all `.txt` files located under the directory `tweets`.



In [28]:
import glob

# get the list of tweet paths
tweet_paths = glob.glob('data_warehouse/tweets/*.txt')
# print the first ten
tweet_paths[:10]

['data_warehouse/tweets/cbd8dd478a834b909f002482e3510e43.txt',
 'data_warehouse/tweets/2d55ebdc03114cd6baea18078555bc01.txt',
 'data_warehouse/tweets/b403fe193f404364a84219325a40eb03.txt',
 'data_warehouse/tweets/bd17c356a0b84ccc84932d007ba19742.txt',
 'data_warehouse/tweets/7772fbc3bacb4e39a953a234a268b4b8.txt',
 'data_warehouse/tweets/2378c8aad6ce478f996c72c6c579220a.txt',
 'data_warehouse/tweets/38d5e757be7d44cb8ac52d52fa82b848.txt',
 'data_warehouse/tweets/b51a4ac0262e4721ae3fcbf4567c18f5.txt',
 'data_warehouse/tweets/d96c3e8dfa164d328a1b8b3a3f97baf2.txt',
 'data_warehouse/tweets/9764f0cad6b04e159e70956f9f892661.txt']

Indeed we can see the list of tweet files, just like the `ls` unix command, but in python. Now that we can list,  read, and filter files, we are close to finishing our data pipeline! Let's figure out how many of our tweets are about beer rather than viruses.

💪💪 In the cell below, write a for loop which iterates through the `tweet_paths` and for each tweet path:
- read the file
- print the text if it contains beer

In [29]:
# INSERT YOUR CODE HERE

for path in tweet_paths:
    
    text = read_file(path)
    
    if contains_beer(text):
        print(text)

Hey give me another Coors how about you Angie I don't like beer Wow man I believe the little lady here wants champagne Hey yeah could I have champagne #Corona

Beer drinkers, hell raisers, yeah. #corona Well, baby, don't you want to come with me?

My bucket's got a hole in it But my bucket's got a hole in it My bucket's got a hole in it Can't buy no beer #Corona

#corona One bourbon, one scotch, and one beer Well, my baby, she gone, gone tonight I ain't seen the girl since night before last I wanna get drunk, get off of my mind

So I got this twelve pack (burr) We blowin' smoke in the air, drinkin' ice cold beer #Corona Got ya girl in my ear sayin', I just wanna party

#corona I hardly ever sing beer drinking songs And when they play them cheatin' tunes, I never sing along I never ever sing the blues I've forgotten, born to lose And I hardly ever sing beer drinking songs

couldn't wait to turn 21 Sit in the backyard sun and drink beer with dad Drinking beer with dad Out there on the ba

🧠 You should have found seven. Now that we know which tweets are not about COVID-19, we could now manually remove each one of these text files. Can you think of reasons why that's a bad idea in practice? 

## 4 Writing Files

Recall that a data pipeline extracts, transforms, and loads data. It is considered bad practice to modify datasets in place (🧠 Can you tell why that's a bad idea?). This means data pipelines can _filter_ data by simply not loading the undesirable data in the final stage. i.e to clean our tweets, we are _only_ going to copy tweets about viruses to a new dataset. To do this, we need to know how to write files.


💪 With the help of this [documentation](https://docs.python.org/3/tutorial/inputoutput.html), code a function to write content to files. The function should take two arguments: first, the path of the file to create, then the content we wish to write.

In [10]:
def write_file(path, content):
    # INSERT CODE HERE
    
    f = open(path, 'a')
    outread = f.write(content)

We can check that the function worked by using the [`cat`](https://en.wikipedia.org/wiki/Cat_(Unix)) unix command.

In [11]:
write_file('current_mood.txt', 'Learning about data pipelines and beer! 🍻\n')
!cat current_mood.txt

Learning about data pipelines and beer! 🍻
Learning about data pipelines and beer! 🍻
Learning about data pipelines and beer! 🍻
Learning about data pipelines and beer! 🍻
Learning about data pipelines and beer! 🍻
Learning about data pipelines and beer! 🍻
Learning about data pipelines and beer! 🍻
Learning about data pipelines and beer! 🍻


If the string isn't shown in the cell output, modify your `write_file` function and try again!

We've learned that versioning our data pipelines is important. It's also useful to add some details about what the data represents. We want to add a simple `README.txt` file to the new dataset with this information. 

🧠 Why is versioning our data pipelines important?

💪 Write a function which can write a `README.txt` for a given output path, and a data pipeline version. Pro-tip: you can reuse the `write_file` function you wrote above. Be as verbose as you wish with the content! One example might be:
> #corona tweets about covid19.  
> Beer tweets filtered with filter_beer v1.0


In [12]:
def write_readme(output_dir, version):
    # INSERT CODE HERE
    
    f = open(output_dir + "/" + 'README.txt', 'w')
    outread = f.write('ALL THE CORONA YOU CAN DRINK' + version)

If the function works, the unit test below should pass. If not, please try again!

In [13]:
def test_write_readme():
    write_readme('.', '1.0')
    content = !cat README.txt
    !rm README.txt
    assert content
    print('Success! 🎉')
    
test_write_readme()

Success! 🎉


🧠🧠 Observe that the `content` variable is _not_ a boolean. Can you explain what the `assert` does here?

## 5 Data Pipelines

It's time to write our data pipeline! 

💪💪 Write a function called `filter_beer` which takes in an input directory path (the input dataset folder), and an output directory path (the output dataset folder). The function should iterate through the files found in the `input_dir`, read their contents, check if they are about beer, and only write those about viruses back into the `output_dir`. Finally, the function should write a `README.txt` file with extra versioning and dataset information.

This may seem like a lot, but you have already written a lot of this in your functions above! Make sure to re-use them here. This includes `.read_file()`, `.contains_beer()`, `.write_file()`, and `.write_readme()`. You'll also need to use the `glob` library to iterate through the files.

I've also included the `file_name` function, which returns the file name for a given path. e.g `some/path/to/a/file.txt` returns `file.txt`. This should come handy! 

In [1]:
import os

def file_name(path):
    return os.path.basename(path)

def filter_beer(input_dir, output_dir):
    # INSERT CODE HERE
    
    alltweets = glob.glob(input_dir + "/*.txt")
    
    for path in alltweets:
    
        text = read_file(path)
    
        if not contains_beer(text):
            
            genoutput = file_name(path)
            write_file(output_dir + "/" + genoutput, text)
            
    write_readme(output_dir, '1.0')
    

The unit test below will check if your function correctly filtered the beer tweets out of the new `covid_tweets` dataset. If it fails, please try again!

In [15]:
def test_filter_beer():
    # make new dataset directory
    !mkdir -p data_warehouse/covid_tweets
    # clean it out just in case
    !rm -f data_warehouse/covid_tweets/*.txt
    # run the filter_beer data pipeline
    filter_beer('data_warehouse/tweets', 'data_warehouse/covid_tweets')
    # list and count the new files
    result = !ls data_warehouse/covid_tweets | wc -l
    covid_tweets_count = int(result[0])
    print(covid_tweets_count)
    # check that the count is correct
    assert covid_tweets_count == 101
    
test_filter_beer()

101


## 6 Automation

We now have a data pipeline to clean our tweets. It's still fairly inconvenient to have to run this function every time new tweets come in (and there are a _lot_ of tweets posted every second!). We want to automate the data pipeline using [`cron`](https://en.wikipedia.org/wiki/Cron). For this, we first have to take our function, and turn it into a python script. 

💪 Copy paste your `filter_beer` method in the `filter_beer.py` file in this directory. I've already added other useful methods inside of it, so you shouldn't have to copy any of your other functions. For this, direclty use your shell or file explorer. You can use any text editor, [sublime](https://www.sublimetext.com/) is a good  choice!  
Pro-tip: if you are looking for the file's path, remember you can run `!pwd` in a code cell of this notebook.

As you may have noticed from the `main()` method, this script will run the data pipeline from a directory called `input` into a directory called `output`. There is one tweet in `input` right now, let's see if the script works. The following cell should return the file name of one tweet. If not, try again!

In [16]:
# this is how to run python code files
!python filter_beer.py
!ls output

360e1cc69cd14994895291ebd75976a3.txt  65fc22b30e2e46fb85bba02d81f914bd.txt
571d832e07bc451caa1bc9ecd98ef4c5.txt


Now that we can run our data pipeline as a python script, we can add to our [`crontab`](https://www.adminschoice.com/crontab-quick-reference). The cell below pulls your python binary and directory information to produce the appropriate `crontab` line.

In [17]:
python_path = !which python
where = !pwd
line = ''.join(['* * * * * ', 'cd ', where[0], ' && ', python_path[0] , ' ' , where[0] , '/filter_beer.py'])
print(line)

* * * * * cd /home/bilalmotiwala/Desktop/Machine Learning/iml/introduction-to-machine-learning/data_engineering/lecture1.4 && /home/bilalmotiwala/.local/share/virtualenvs/introduction-to-machine-learning-RiBp7zxe/bin/python /home/bilalmotiwala/Desktop/Machine Learning/iml/introduction-to-machine-learning/data_engineering/lecture1.4/filter_beer.py


the `* * * * *` starting the line indicates `cron` that we wish to schedule this command to run _every minute_. Please note that this is not a realistic data pipeline scenario, and that to get real-time processing of data one should consider streaming solutions. But `cron` is a simple and effective solution if you want to automate your data pipelines and need them to run every day, week, ... Nethertheless, for the purpose of illustration, we'll keep our schedule fast so that we can (almost) immediately see results. 

💪 Add the output of the cell above to your `crontab` file. To do this, run `crontab -e` in your shell and paste the line with your chosen editor. If you are getting some errors on the ubuntu shell for windows, please try [this](https://stackoverflow.com/questions/41281112/crontab-not-working-with-bash-on-ubuntu-on-windows).

Now let's check to see if our scheduled data pipeline works! Run the cell below to copy a tweet into the `input` directory.

In [18]:
!cp data_warehouse/tweets/65fc22b30e2e46fb85bba02d81f914bd.txt input
!ls input

360e1cc69cd14994895291ebd75976a3.txt  65fc22b30e2e46fb85bba02d81f914bd.txt
571d832e07bc451caa1bc9ecd98ef4c5.txt  6601dc27f19347aab553582228a79f1c.txt


The `input` directory should now contain 2 files. Now your automated data pipeline should be in the process of extracting, transforming, and loading your data! **Please wait a couple minutes** before running the cell below (or your croned job won't have had the time to be scheduled yet).

In [19]:
!ls output

360e1cc69cd14994895291ebd75976a3.txt  65fc22b30e2e46fb85bba02d81f914bd.txt
571d832e07bc451caa1bc9ecd98ef4c5.txt


This should return _the same_ files as for the `input` directory above. If not, please update your `crontab` and try again!

We have shown that our data pipeline moved files from one dataset to another, but we haven't proved that it can successfully filter out tweets about beers.

💪 Using `cp`, copy two files from the `tweets` dataset in the data warehouse to the `input` directory in the cell below. The first tweet should be `571d832e07bc451caa1bc9ecd98ef4c5.txt` (about viruses), the second `6601dc27f19347aab553582228a79f1c.txt` (about beer). Then check that they were correctly copied using `ls`.

In [20]:
# INSERT CODE HERE
!cp data_warehouse/tweets/571d832e07bc451caa1bc9ecd98ef4c5.txt input
!cp data_warehouse/tweets/6601dc27f19347aab553582228a79f1c.txt input
!ls input

360e1cc69cd14994895291ebd75976a3.txt  65fc22b30e2e46fb85bba02d81f914bd.txt
571d832e07bc451caa1bc9ecd98ef4c5.txt  6601dc27f19347aab553582228a79f1c.txt


Once you have made sure that the new files were added to the `input` directory, **please wait a couple of minutes**, then run the cell below.

In [21]:
!ls output

360e1cc69cd14994895291ebd75976a3.txt  65fc22b30e2e46fb85bba02d81f914bd.txt
571d832e07bc451caa1bc9ecd98ef4c5.txt


🧠 What happened? Is this what you expected? Please summarise every step involved in the process.

🧠🧠 Can you list ways in which an orchestration framework (e.g: `airflow`, `luigi`) would improve our `cron` scheduling?

💪 Feel free to add more files to `input` and check what is filtered from `output`. You can even write your own tweets! 

Last but not least, let's remove our data pipeline scheduling so that your machine doesn't try to filter beer tweets every minute for the rest of time 😅.

**Warning**: this will delete your crontab entries. If you are a regular crontab user, please remove the appropriate line using `crontab -e` instead. If you have never used crontab before, no worries, please continue! 

💪 Run the cell below to clear out your crontab and remove the data pipeline automation.

In [22]:
!crontab -r

no crontab for bilalmotiwala


## 7 Summary

Today, we have learned about **data pipelines**, and how they are a key part of modern data architectures. We have seen how they can be chained into **complex workflows**, often **orchestrated** by frameworks. We have seen how they can be improved by **automation**, **testing**, and **versioning**. We also heard a couple of tips on how to avoid writing complicated obscure code, and instead aiming for **straightforward, reusable tools**.

In this notebook we also implemented a simple data pipeline to filter out tweets about beer in our `#corona` twitter dataset. To do this, we used python file i/o functions to iterate through, read, and write files. Finally, we scheduled the python script in `cron`, and saw our automated data pipeline in action! 🤖

# Resources
## Core Resources
- [**Slides**](https://docs.google.com/presentation/d/1KLTh7NLoTnKFRZsIIthi8YEH_l4U1NEZzwmctqWrUeo/edit?usp=sharing)
- [What is a data pipeline](https://www.alooma.com/blog/what-is-a-data-pipeline)
- [python i/o documentation](https://docs.python.org/3/tutorial/inputoutput.html) 
- [glob docs](https://docs.python.org/3/library/glob.html)
- [crontab for WSL](https://stackoverflow.com/questions/41281112/crontab-not-working-with-bash-on-ubuntu-on-windows)

## Additional Resources

- [crontab guru](https://crontab.guru/)  
Simple website to test your crontab configurations
- [A beginner's guide to data engineering](https://medium.com/@rchang/a-beginners-guide-to-data-engineering-part-i-4227c5c457d7)  
In depth 3 part post on data pipelines at airbnb
- [Orchestrating batch processing pipelines with cron and make](https://snowplowanalytics.com/blog/2015/10/13/orchestrating-batch-processing-pipelines-with-cron-and-make/)  
Lightweight DIY alternative to data pipeline management tools.
- [Airflow a cron replacement for data pipelines](https://medium.com/@rbahaguejr/airflow-a-beautiful-cron-alternative-or-replacement-for-data-pipelines-b6fb6d0cddef)  
Simple working example of data pipeline management with Airflow.
- [Luigi vs Airflow](https://towardsdatascience.com/data-pipelines-luigi-airflow-everything-you-need-to-know-18dc741449b7)  
Summary with code of the differences between two popular data pipeline management tools.