### **Caveats**: The subprocess commands may not run on local pc.  For example, Microsoft Windows does not conform to POSIX by default. For instance, there is no `ls` binary. Therefore, subprocess cannot find the file `ls`, and thus emits a `FileNotFoundError`.

For example, my pc on Windows 7 cannot even execute `ls`.

*  You can install [Microsoft's Bash on Windows](https://docs.microsoft.com/en-us/windows/wsl/install-win10), which will give you `ls` and probably many other Shell's command in this lecture.

# **Using `IPython` with shell commands**

To get the output of the display free disk usage command, or `df`, in human-readable form, "`!`" is put in front of the df command and the `dash h` flag is used.

In [None]:
!df -h

Filesystem      Size  Used Avail Use% Mounted on
overlay         108G   39G   70G  36% /
tmpfs            64M     0   64M   0% /dev
tmpfs           6.4G     0  6.4G   0% /sys/fs/cgroup
shm             5.9G     0  5.9G   0% /dev/shm
tmpfs           6.4G   24K  6.4G   1% /var/colab
/dev/sda1       114G   41G   74G  36% /etc/hosts
tmpfs           6.4G     0  6.4G   0% /proc/acpi
tmpfs           6.4G     0  6.4G   0% /proc/scsi
tmpfs           6.4G     0  6.4G   0% /sys/firmware


*   Notice how the output is whitespace delimited. 
*   This output is columnar format.
*   `IPython` gives us the ability to parse that columnar structure into an `SList` data type. 

# **Capturing output from IPython shell commands**

*  The output of shell commands can be captured by assigning them to a variable. 
*  They are stored as a `Python` data type called `IPython.utils.text.SList`. 

For example,



In [None]:
ls = !ls
df = !df -h
print(type(ls),type(df))

<class 'IPython.utils.text.SList'> <class 'IPython.utils.text.SList'>


# **The "!" can be used only on `IPython Shell` or `Jupyter Notebook`.**

*  If you use this syntax in a Python script,e.g., `my_script.py`, it will throw a Syntax Error. 
*  To accomplish the same thing in Python scripts you would instead need to use the `subprocess` module.

# **Passing programs to the Python interpreter**
The Python interpreter can be run in two ways. 

*   The first way is to pass it a script, e.g.,

`python my_script.py`

*   The second way is to pass it programs via the `dash c` flag.

In [None]:
!python -c "from datetime import datetime; import pytz; tz = pytz.timezone('Asia/Bangkok'); print(datetime.now(tz).strftime('%H:%M:%S'))"

17:11:39


This `python -c` can be useful to test out small snippits of code or to create shell aliases that provide functionality in a `bash` or a `zsh` config file.

# **Execute Python Commands**

Below an An IPython script, `script.ipy`, has been provided for you to work in. You can run the script by running `ipython script.ipy` in the console.

In [None]:
!python3 -c "from random import choices;days = ['Mo', 'Tu', 'We', 'Th', 'Fr'];print(choices(days))"

['We']


# **Execute IPython Shell Commands**

*  Determine how many files of a specific type live in a directory by using this along with the built-in `len()` method. 
*  The output of `!ls` will return a list which you can store as a variable.



*   Navigate to the `sample_data` directory and check how many total files have the extension `.csv`. Store the results of command to the variable `var`.

*   Run `len()` on var to print the number of `.csv `files that live in the directory.

In [None]:
var = !ls -h sample_data/*.csv
print(var)
print(len(var))

e = []
for name in var:
    e.extend(name.split())
print(e)


['sample_data/california_housing_test.csv   sample_data/mnist_test.csv', 'sample_data/california_housing_train.csv  sample_data/mnist_train_small.csv']
2
['sample_data/california_housing_test.csv', 'sample_data/mnist_test.csv', 'sample_data/california_housing_train.csv', 'sample_data/mnist_train_small.csv']


This is strange.  The result is not neat.  However,  you can use `!ls */*.csv` to find the number of `csv` files. If you store as a variable, you can then use: `len(var)` to get the count.

# **Capture IPython Shell Output**

*    In the cell below an initial `ls` command will emit the results to standard out. 
*    First, write a test file.

In [None]:
%%file numbers.txt
1 12
2 18
3 45
4 5
5 71
6 96
7 13
8 12

Writing numbers.txt


In [None]:
!ls -l

total 8
-rw-r--r-- 1 root root   38 Jun 11 14:13 numbers.txt
drwxr-xr-x 1 root root 4096 Jun  1 13:40 sample_data


*  Next the `awk` command adds the `fift column` values of every file that is piped into the command and prints the sum.
*  คอลัมน์ที่พูดถึง คือ คอลัมน์จาก output ของ `ls -l` ***ไม่ใช่ คอลัมน์ของตัวเลขในไฟล์***
*  คอลัมน์ที่ 5 คือ ขนาด bytes ของไฟล์

In [None]:
ls -l | awk '{ SUM+=$5} END {print SUM}' 

4134


*  ตัวเลข 4134 ข้างบน มาจาก 38 + 4096
*  ลองทำคอลัมน์อื่นๆเป็นตัวอย่างดู

In [None]:
!ls -l | awk '{ SUM+=$1} END {print SUM}' 
!ls -l | awk '{ SUM+=$2} END {print SUM}' 
!ls -l | awk '{ SUM+=$3} END {print SUM}' 
!ls -l | awk '{ SUM+=$4} END {print SUM}' 
!ls -l | awk '{ SUM+=$5} END {print SUM}' 
!ls -l | awk '{ SUM+=$6} END {print SUM}' 
!ls -l | awk '{ SUM+=$7} END {print SUM}' 
!ls -l | awk '{ SUM+=$8} END {print SUM}' 
!ls -l | awk '{ SUM+=$9} END {print SUM}' 
!ls -l | awk '{ SUM+=$10} END {print SUM}' 

0
10
0
0
4134
0
12
27
0
0


The `grep` command filters the standard output of the `ls` command which then sends `.txt` files and `sample` folder to the third command, `awk`, to summarize. 

In [None]:
!ls -l | grep .txt | awk '{ SUM+=$5} END {print SUM}' 
!ls -l | grep .sample | awk '{ SUM+=$5} END {print SUM}' 
!ls -l | grep .txt | awk '{ SUM+=$2} END {print SUM}' 
!ls -l | grep .sample | awk '{ SUM+=$2} END {print SUM}'
!ls -l | grep .txt | awk '{ SUM+=$7} END {print SUM}' 
!ls -l | grep .sample | awk '{ SUM+=$7} END {print SUM}'

38
4096
1
1
11
1


In [None]:
!ls -l | awk '{ SUM+=$1} END {print SUM}' numbers.txt
!ls -l | awk '{ SUM+=$2} END {print SUM}' numbers.txt

36
272


# **Capturing shell output with bash magic function**

Capturing the standard output of shell commands in `IPython` is to use the `bash` magic command with the flag `output`.

In [None]:
%%bash --out output
!ls -l | awk '{ SUM+=$5} END {print SUM}'

['4134']

In [None]:
ls_count = !ls -l | awk '{ SUM+=$5} END {print SUM}'
display(type(ls_count))
display(ls_count)

IPython.utils.text.SList

['4134']

Below is a command that will create output to `STDERR`
  *   `STDERR` isn't captured

In [None]:
%%bash --out output
ls --turbo

ls: unrecognized option '--turbo'
Try 'ls --help' for more information.


Below, use the `bash` magic function to run a code block, both the `standard output` and the `standard error` are captured in different variables.

In [None]:
%%bash --out output --err error
ls -l | awk '{ SUM+=$5} END {print SUM}'
echo "no error so far" >&2

4134
no error so far


In [None]:
%%bash --out output --err error
ls -l | awk '{ SUM+=$5} END {print SUM}'
echo "no error so far" >&2

You can use `%%bash` magic syntax to capture the output of a script in IPython.

In [None]:
%%file script.ipy
%%bash --out output
echo "Running Directory Audit Script"
CSV=`ls -l test_dir/*.csv | wc -l`
TXT=`ls -l test_dir/*.csv | wc -l`
echo 'The directory contains this a total # *.csv files: ' $CSV
echo 'The directory contains this a total # *.txt files: ' $TXT

Writing script.ipy


The code above `%%bash --out output`, when run, will allow you to run a code block with output stored in the variable output

In [None]:
!echo "Running Directory Audit Script"
CSV= !ls -l *.csv | wc -l
TXT= !ls -l *.txt | wc -l
!echo 'The directory contains this a total # *.csv files: ' $CSV
!echo 'The directory contains this a total # *.txt files: ' $TXT

Running Directory Audit Script
The directory contains this a total # *.csv files:  [5]
The directory contains this a total # *.txt files:  [6]


The `awk` command can be used to filter some patterns. Use the `!` operator to create a command that sums the total size of the files in a directory.
Make sure you pipe a command into `awk` using the | operator.

In [None]:
ls -l | awk '{ SUM+=$5} END {print SUM}'

8887


#  **SList methods**
When output from a shell command is captured in IPython `SList` has the 
*  `fields` method 
*  `grep` method 
*  `sort` method

## **SList fields**

The `fields` method can simulate the `awk` command. A good example of this is grabbing just the modification datetime for a few ls entries in the output by using the fifth, sixth, and seventh field.

   *  พอมาอยู่ใน `field()` index จะนับจาก 0 แต่ตอนเป็น `awk` นับจาก 1

In [None]:
ls = !ls -l
ls.fields(5, 6, 7, 8)[0:4]

['Jun 11 15:21 file_0.csv',
 'Jun 11 15:21 file_1.txt',
 'Jun 11 15:22 file_2.csv',
 'Jun 11 15:23 file_3.txt']

## **SList grep**

When you type in a portion of a word or phrase, it will finds matches based on the portion you have typed in.

In [None]:
ls = !ls -l
ls.grep(".txt")

['-rw-r--r-- 1 root root   12 Jun 11 15:21 file_1.txt',
 '-rw-r--r-- 1 root root   12 Jun 11 15:23 file_3.txt',
 '-rw-r--r-- 1 root root   12 Jun 11 15:24 file_5.txt',
 '-rw-r--r-- 1 root root   12 Jun 11 15:25 file_7.txt',
 '-rw-r--r-- 1 root root   12 Jun 11 15:25 file_9.txt',
 '-rw-r--r-- 1 root root   38 Jun 11 14:13 numbers.txt']

The word `kill` is used to find utilities that will kill unix processes.

In [None]:
ls = !ls -l
ls.grep("kill")

[]

## **SList sort**

Perform sorting on the output of a shell command.

In [None]:
disk_usage = !df -h
disk_usage

['Filesystem      Size  Used Avail Use% Mounted on',
 'overlay         108G   39G   70G  36% /',
 'tmpfs            64M     0   64M   0% /dev',
 'tmpfs           6.4G     0  6.4G   0% /sys/fs/cgroup',
 'shm             5.9G     0  5.9G   0% /dev/shm',
 'tmpfs           6.4G   28K  6.4G   1% /var/colab',
 '/dev/sda1       114G   41G   74G  36% /etc/hosts',
 'tmpfs           6.4G     0  6.4G   0% /proc/acpi',
 'tmpfs           6.4G     0  6.4G   0% /proc/scsi',
 'tmpfs           6.4G     0  6.4G   0% /sys/firmware']

*  The first argument in `SList.sort()` is which column to sort on. For example, the Size or the Used column. 
*  The second argument is whether to sort by alphabetical or numerical values.

In [None]:
disk_usage.sort(5, nums = True)

['/dev/sda1       114G   41G   74G  36% /etc/hosts',
 'Filesystem      Size  Used Avail Use% Mounted on',
 'overlay         108G   39G   70G  36% /',
 'shm             5.9G     0  5.9G   0% /dev/shm',
 'tmpfs            64M     0   64M   0% /dev',
 'tmpfs           6.4G     0  6.4G   0% /proc/acpi',
 'tmpfs           6.4G     0  6.4G   0% /proc/scsi',
 'tmpfs           6.4G     0  6.4G   0% /sys/firmware',
 'tmpfs           6.4G     0  6.4G   0% /sys/fs/cgroup',
 'tmpfs           6.4G   28K  6.4G   1% /var/colab']

### **Use SList fields to parse shell output**
*  Extract the display of the free disk usage and store it in disk_space.
*  Use the `.fields()` method on disk_space to select the column that shows total size of the mounted volumes.

In [None]:
disk_space = !df -h
print(disk_space.fields(0))

['Filesystem', 'overlay', 'tmpfs', 'tmpfs', 'shm', 'tmpfs', '/dev/sda1', 'tmpfs', 'tmpfs', 'tmpfs']


### **Find Python files using SLIST grep**

* using the SList `.grep()` method to filter for files only containing the pattern `.py`

In [None]:
res = !ls  # ถ้าต้องการหาในไหนให้ใส่ directory หลังจาก !ls
print(res.grep(".py"))

['file_0.csv  file_3.txt\tfile_6.csv  file_9.txt\t script.ipy', 'file_1.txt  file_4.csv\tfile_7.txt  numbers.txt  script.py']


ทำไม ใช้ไม่ได้ ?

In [None]:
res = !ls -l
print(res.grep('.py'))

['-rw-r--r-- 1 root root  255 Jun 11 15:45 script.ipy', '-rw-r--r-- 1 root root  262 Jun 11 15:43 script.py']


### **Using SList to grep**



In [None]:
  import os
  help(os.path.join)

Help on function join in module posixpath:

join(a, *p)
    Join two or more pathname components, inserting '/' as needed.
    If any component is an absolute path, all previous path components
    will be discarded.  An empty last part will result in a path that
    ends with a separator.



In [None]:
%%file slist_out.py

from subprocess import (Popen, PIPE)
import subprocess
from IPython.utils.text import SList

root = "/content"

p = subprocess.Popen(["ls", "-l", root], stdout=subprocess.PIPE)
bytes_out = p.stdout.readlines()
out = []
for line in bytes_out:
	out.append(line.decode(".csv"))
    
slist_out = SList(out)


Overwriting slist_out.py


In [None]:
import os
from slist_out import slist_out

# Save the name of the root directory
root = "content"

# Find the backups with "_2" in slist_out
result = slist_out.grep(".csv")

# Extract the filenames
for res in result:
	filename = res.split()[-1]
    
	# Create the full path
	fullpath = os.path.join(root, filename)
	print(f"fullpath of backup file: {fullpath}")
    


#  **Using `subprocess.run`**

`subprocess.run()` takes a list of strings and runs a command without capturing the output.

In [None]:
import subprocess
subprocess.run(["ls", "-l"])

CompletedProcess(args=['ls', '-l'], returncode=0)

### **`Bytes` string are default in subprocess**.

In [None]:
res = b'repl 24 0.0 0.0 36072 3144 pts/0 R+ 03:15 0:00 ps aux\n'
print(type(res))
print(res)

<class 'bytes'>
b'repl 24 0.0 0.0 36072 3144 pts/0 R+ 03:15 0:00 ps aux\n'


When dealing with byte strings, they will need to be `decoded` in many cases to process them further. 

In [None]:
res = b'repl 24 0.0 0.0 36072 3144 pts/0 R+ 03:15 0:00 ps aux\n'
regular_string = res.decode("utf-8")
print(type(regular_string))
print(regular_string)

<class 'str'>
repl 24 0.0 0.0 36072 3144 pts/0 R+ 03:15 0:00 ps aux



# **Unix status codes**

*   Unix commands return a status code that represents the status of their completion. 
*   If a command exits with a ***zero*** status, it was successful.
*   If a command exits with ***non-zero*** status it has failed. 

The value of the status code of the last command run can be printed to standard out by using the `echo` command along with `$` sign and question mark.

In [None]:
!ls -l
!echo $?

!ls --bogus-flag
!echo $?

total 16
-rw-r--r-- 1 root root  643 Jun 12 14:12 aa.md
-rw-r--r-- 1 root root    3 Jun 12 14:13 f_0.txt
drwxr-xr-x 1 root root 4096 Jun 12 14:16 sample_data
-rw-r--r-- 1 root root  643 Jun 12 14:13 sample_data.md
0
ls: unrecognized option '--bogus-flag'
Try 'ls --help' for more information.
0


# **Execute shell commands in subprocess**

*  When running a command with `subprocess.run()` the status code of the unix shell command can be captured and inspected.

*  This is accomplished via the `CompletedProcess` object.
*  This object contains not only the `returncode`, or `exit status` of the child process, but also the `arguments` used to launch the process.



In [None]:
import subprocess
out = subprocess.run(["ls", "-l"])
display(out)
print(out.returncode)

CompletedProcess(args=['ls', '-l'], returncode=0)

0


# **Non-zero status codes in `subprocess.run`**

A non-zero exit code of one is returned because the `ls` command was passed an argument `turbo` that doesn't exist.



In [None]:
import subprocess
bad_out = subprocess.run(["ls", "--turbo"])
print(bad_out.returncode)

2


### **Permission check**

You want to verify that when the network filesystem is mounted on a new system, each worker node is able to create files with the correct permissions.

*   Import the **subprocess** and `os` packages.
*   Write a script that will use the Unix `touch` command in `subprocess.Popen` to create a file.
*   Then use the `os.stat` module to check the correct `uid` was created on the file.
If the correct `uid` was created, print out a success message.

In [None]:
# Import packages
import subprocess
import os

# Setup
file_location = "/content/file_1.txt"
expected_uid = 1000

# Create a file
proc = subprocess.Popen(["touch", file_location])

# Check user permissions
stat = os.stat(file_location)
if stat.st_uid == expected_uid:
    print(f"File System exported properly: {expected_uid} == {stat.st_uid}")
else:
    print(f"File System NOT exported properly: {expected_uid} != {stat.st_uid}")


File System NOT exported properly: 1000 != 0


In [None]:
# Generate poem.txt
with open('poem.txt', 'w') as f:
    for i in range(1000):
        f.write("All work and no play makes Jack a dull boy.\n")

### **Reading a creepy AI poem**
*   The Unix command `head` will read the first few lines. 
*   The Unix command `wc -w` will count the total number of words. 
*   The name of the poem is called `poem.txt`.
*   Use `subprocess.Popen` to run each of these shell commands print the results. You must pass `stdout=subprocess.PIPE` into `Popen` to capture the output of `wc`.
  *   Print the first few lines of `/content/poem.txt` using `head`.
  *   Count the total words in `/content/poem.txt` using `wc -w` and print out the total.
  *   Safely execute these commands by passing them in as items in a list.

In [None]:
import subprocess

# Execute Unix command `head` safely as items in a list
with subprocess.Popen(["head", "/content/poem.txt"], stdout=subprocess.PIPE) as head:
  
    # Print each line of list returned by `stdout.readlines()`
    for line in head.stdout.readlines():
        print(line)
    
# Execute Unix command `wc -c` safely as items in a list
with subprocess.Popen(["wc", "-c", "/content/poem.txt"], stdout=subprocess.PIPE) as word_count:
  
    # Print the string output of standard out of `wc -c`
    print(word_count.stdout.read())


b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'All work and no play makes Jack a dull boy.\n'
b'44000 /content/poem.txt\n'


### **Running processes script**

Write a script using `subprocess.run` and `ps aux` that discards all CPU output with the string 'python' in it. This will hide your secret Python scripts.

*   Use `subprocess.run` to execute `ps aux`.
*   Discard lines with `'python'` in them.
*   Print out all other lines.

In [None]:
import subprocess

# Use subprocess to run the `ps aux` command that lists running processes
with subprocess.Popen(["ps", "aux"], stdout=subprocess.PIPE) as proc:
    process_output = proc.stdout.readlines()
    
# Look through each line in the output and skip it if it contains "python"
for line in process_output:
    if b"python" in line:
        continue
    print(line)


b'USER         PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND\n'
b'root           1  0.1  0.4 347492 57092 ?        Ssl  13:12   0:07 /tools/node/bin/node /datalab/web/app.js\n'
b'root          16  0.0  0.0  35888  4764 ?        Ss   13:12   0:00 tail -n +0 -F /root/.config/Google/DriveFS/Logs/dpb.txt /root/.config/Google/DriveFS/Logs/drive_fs.txt\n'
b'root          50  0.0  0.0 710124  8496 ?        Sl   13:12   0:02 /usr/local/bin/dap_multiplexer --domain_socket_path=/tmp/debugger_resm0z4z8\n'
b'root        1075  0.0  0.0  59040  6240 ?        R    14:46   0:00 ps aux\n'


This is what it looks like if we do not skip the line with `'python'`.

In [None]:
for line in process_output:
    print(line)

b'USER         PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND\n'
b'root           1  0.1  0.4 347492 57092 ?        Ssl  13:12   0:07 /tools/node/bin/node /datalab/web/app.js\n'
b'root          16  0.0  0.0  35888  4764 ?        Ss   13:12   0:00 tail -n +0 -F /root/.config/Google/DriveFS/Logs/dpb.txt /root/.config/Google/DriveFS/Logs/drive_fs.txt\n'
b'root          49  0.1  0.4 195440 61144 ?        Sl   13:12   0:07 /usr/bin/python2 /usr/local/bin/jupyter-notebook --ip="172.28.0.2" --port=9000 --FileContentsManager.root_dir="/" --MappingKernelManager.root_dir="/content"\n'
b'root          50  0.0  0.0 710124  8496 ?        Sl   13:12   0:02 /usr/local/bin/dap_multiplexer --domain_socket_path=/tmp/debugger_resm0z4z8\n'
b'root          81  0.0  0.0      0     0 ?        Z    13:13   0:02 [python3] <defunct>\n'
b'root         574  0.1  0.0      0     0 ?        Z    14:16   0:01 [python3] <defunct>\n'
b'root         824  0.6  0.8 702528 118556 ?       Ssl  14:34   0:04 /u

# **Capture output of shell commands using the `subprocess.Popen` module**

*  Import `Popen` and `PIPE` from `subprocess`. 
*  You will set the `stdout` parameter to use `PIPE`, which allows input to be captured. 
*  Next use `Popen` to capture the output and store the output into the result variable as a list.



In [None]:
from subprocess import Popen, PIPE

with Popen(["ls"], stdout=PIPE) as proc:
    out = proc.stdout.readlines()
print(out)  

[b'aa.md\n', b'f_0.txt\n', b'file_1.txt\n', b'poem.txt\n', b'sample_data\n', b'sample_data.md\n']


# **`subprocess.Popen.communicate()`**

*  Below you can see how it is used to communicate with both `stdout` and `stderr` by waiting for up to 30 seconds. 
*  If it exceeds this duration it will throw an Exception. 
*  This Exception, `TimeoutExpired` can be caught and then a code block can perform some other action like 
  *  killing the process, 
  *  cleaning up resources used or 
  *  other business logic. 

In [None]:
from subprocess import Popen, PIPE
proc = Popen(["ls"], stdout=PIPE)
# Attempt to communicate for up to 30 seconds
try:
    out, err = proc.communicate(timeout=30)
except TimeoutExpired:
# kill the process since a timeout was triggered
    proc.kill()
# capture both standard output and standard error
    out, error = proc.communicate()
    
print(out)
print(err)

b'aa.md\nf_0.txt\nfile_1.txt\npoem.txt\nsample_data\nsample_data.md\n'
None


#  **`subprocess.PIPE()`**

*  subprocess.PIPE() is required for capturing the output

#  **Required components of `subprocess.Popen`**

*  `stdout.read()` : returns output as a string
*   `stdout.readlines()` : returns outputs as an interator

### **Optional `stderr`**

In [None]:
from subprocess import Popen, PIPE

with Popen(["ls", "/a/bad/path"], stdout=PIPE, stderr=PIPE) as proc:
    print(proc.stderr.read())

b"ls: cannot access '/a/bad/path': No such file or directory\n"


### **Using subprocess Popen**

Use `subprocess.Popen()`, and the bash command `pip list --format=json` command, to find all of the installed packages. Because the `pip` tool emits `JSON` data, you can use `json.loads()` to convert `JSON` data to a Python dictionary.

*  Use the `with` context manager to run `subprocess.Popen()`.
*  Pipe the output of `subprocess.Popen()` to stdout as an iterator.
*  Convert the JSON payload to a Python dictionary with `json.loads` after extracting the only element of the `results` list.
*  Print the result using the `pprint` function of the `pprint` package.

In [None]:
from subprocess import Popen, PIPE
import json
import pprint

# Use the with context manager to run subprocess.Popen()
with Popen(["pip","list","--format=json"], stdout=PIPE) as proc:
  # Pipe the output of subprocess.Popen() to stdout
    result = proc.stdout.readlines()
  
# Convert the JSON payload to a Python dictionary
# JSON is a datastructure similar to a Python dictionary
converted_result = json.loads(result[0])

# Display the result in the IPython terminal
pprint.pprint(converted_result)

[{'name': 'absl-py', 'version': '0.12.0'},
 {'name': 'alabaster', 'version': '0.7.12'},
 {'name': 'albumentations', 'version': '0.1.12'},
 {'name': 'altair', 'version': '4.1.0'},
 {'name': 'appdirs', 'version': '1.4.4'},
 {'name': 'argon2-cffi', 'version': '20.1.0'},
 {'name': 'arviz', 'version': '0.11.2'},
 {'name': 'astor', 'version': '0.8.1'},
 {'name': 'astropy', 'version': '4.2.1'},
 {'name': 'astunparse', 'version': '1.6.3'},
 {'name': 'async-generator', 'version': '1.10'},
 {'name': 'atari-py', 'version': '0.2.9'},
 {'name': 'atomicwrites', 'version': '1.4.0'},
 {'name': 'attrs', 'version': '21.2.0'},
 {'name': 'audioread', 'version': '2.1.9'},
 {'name': 'autograd', 'version': '1.3'},
 {'name': 'Babel', 'version': '2.9.1'},
 {'name': 'backcall', 'version': '0.2.0'},
 {'name': 'beautifulsoup4', 'version': '4.6.3'},
 {'name': 'bleach', 'version': '3.3.0'},
 {'name': 'blis', 'version': '0.4.1'},
 {'name': 'bokeh', 'version': '2.3.2'},
 {'name': 'Bottleneck', 'version': '1.3.2'},
 {

### **Waiting for processes**

*   Start a long running process using `subprocess.Popen()`.
*   Linux `sleep` command will suspend execution of a shell for a period of time.
*   Use `subprocess.communicate()` to create a timeout.
*   Cleanup the process if it takes longer than the timeout.
*   Print error message and standard out and standard error streams.

In [None]:
from subprocess import (Popen, PIPE, TimeoutExpired)

# Start a long running process using subprocess.Popen()
proc = Popen(["sleep", "2"], stdout=PIPE, stderr=PIPE)

# Use subprocess.communicate() to create a timeout 
try:
    output, error = proc.communicate(timeout=1)
    
except TimeoutExpired:

	# Cleanup the process if it takes longer than the timeout
    proc.kill()
    
    # Read standard out and standard error streams and print
    output, error = proc.communicate()
    print(f"Process timed out with output: {output}, error: {error}")


Process timed out with output: b'', error: b''


### **Detecting duplicate files with `subprocess`**

The `md5sum` utility is a shell command that finds the unique hash of each file.

*  Iterate over the list of files `filenames`.
*  Use `Popen` to call the `md5sum` utility.
*  Append duplicate file to a list.
*  Print the duplicates out.

Below we create some simple files to see how we can detect duplicate files.

In [None]:
%%file /content/sample_data/f_0.txt
1
2

Overwriting /content/sample_data/f_0.txt


In [None]:
%%file /content/sample_data/f_1.txt
1
2

Overwriting /content/sample_data/f_1.txt


In [None]:
%%file /content/sample_data/f_2.txt
1
2

Overwriting /content/sample_data/f_2.txt


In [None]:
from subprocess import Popen, PIPE
import os

# Don't change these variables
checksums = {}
duplicates = []
root = "/content/sample_data"
files = [os.path.join(root, f) for f in os.listdir(root)]
# Iterate over the list of files filenames
for filename in files:
    # Use Popen to call the md5sum utility
    with Popen(["md5sum", filename], stdout=PIPE) as proc:
        try:
            checksum, _ = proc.stdout.read().split()
        except ValueError: # During creating files, it creates some unknown empty proc.stdout so this is the way to skip that error
            continue
        # Append duplicate to a list if the checksum is found
        if checksum in checksums:
            duplicates.append(filename)
        checksums[checksum] = filename

print(f"Found Duplicates: {duplicates}")

Found Duplicates: ['/content/sample_data/f_0.txt', '/content/sample_data/f_1.txt']


# **Using Unix Pipes as input**

*  With `subprocess.Popen` the `stdin` argument takes the `stdout` of a second process. 
*  Below, notice that the first process takes a `stdout` argument of `subprocess.PIPE`. 

In [None]:
from subprocess import Popen, PIPE

proc1 = Popen(["process_one.sh"], stdout=PIPE)
Popen(["process_two.sh"], stdin=proc1.stdout)

With `subprocess.run()`, the input argument takes the `stdout` of another process as opposed to using the `stdin` argument in `subprocess.Popen`.

In [None]:
from subprocess import Popen, PIPE, run

proc1 = run(["process_one.sh"], stdout=PIPE)
run(["process_two.sh"], input=proc1.stdout)

These two languages, `python` objects and `Unix` strings, have to be translated back and forth when using the `subprocess` module.

Often when automation happens, a script will wait for a user to input a string. 
*   With the `bash` shell this can be accomplished with the `read` command. 
*   With `Python` this can be accomplished using `input()`.

In more sophisticated scripts in `Python`, a command line library like `click` or `argparse` will be used to pass commands to a `Python` script.

### **Counting files in a directory tree**

Use `subprocess.run` to pipe the output of the `find` command to `wc -l` to print the numbers of files in the directory tree.

*  Use `subprocess.run` to run unix command `find . -type f -print`.
*  Send the output of the `find` command to the input of `wc -l`.
*  Decode the bytes to strings.
*  Strip the output of spaces and print.

In [None]:
from subprocess import Popen, PIPE

# Runs find command to search for files
find = Popen(["find", "/content/sample_data", "-type", "f", "-print"], stdout=PIPE)

# Runs wc and counts the number of lines
word_count = Popen(["wc", "-l"], stdin=find.stdout, stdout=PIPE)

# Print the decoded and formatted output
output = word_count.stdout.read()
print(output.decode("utf-8").strip())


9


Let's see an intermediate output

In [None]:
from subprocess import Popen, PIPE

# Runs find command to search for files
find = Popen(["find", "/content/sample_data", "-type", "f", "-print"], stdout=PIPE)
print(find.stdout.read().decode("utf-8"))

/content/sample_data/README.md
/content/sample_data/anscombe.json
/content/sample_data/f_2.txt
/content/sample_data/f_0.txt
/content/sample_data/f_1.txt
/content/sample_data/california_housing_test.csv
/content/sample_data/mnist_train_small.csv
/content/sample_data/california_housing_train.csv
/content/sample_data/mnist_test.csv



### **Running a health check**

Send the output of an `echo 'python3'` command to a `healthcheck.sh` script.

*  Echo `'python3'` using `subprocess.Popen`.
*  Send the output of the echo subprocess to the "/content/healthcheck.sh" script.
*  Capture the output of `subprocess` that invokes "/content/healthcheck.sh" and use assert to verify `python3` is in the output.

The healthcheck.sh is created below in /content/healthcheck.sh.  Note that without the Shell command `!chmod 755 healthcheck.sh`, you will see PermissionError if you run on colab.

In [None]:
%%file healthcheck.sh
#!/bin/bash
echo "Enter executable to check: "
read executable
path=`which $executable`
echo "Location: $path"

Writing healthcheck.txt


In [None]:
from subprocess import Popen, PIPE
!chmod 755 healthcheck.sh

# equivalent to 'echo "python3"'
echo = Popen(["echo", "python3"], stdout=PIPE)

# equivalent to: echo "python3" | ./healthcheck.sh
path = Popen(["/content/healthcheck.sh"], stdin=echo.stdout, stdout=PIPE)

full_path = path.stdout.read().decode("utf-8")
print(f"...Health Check Output...\n\n {full_path}")

# The assertion will fail if python3 executable path is not found
assert "python3" in full_path


...Health Check Output...

 Enter executable to check: 
Location: /usr/bin/python3



# **Understanding `shell=True` in subprocess**

*  By default in `subprocess` both `Popen` and `run` have `shell=False`. 
*  This default makes it so arguments must be passed in a `list`.
*  If `shell=True` absolutely must be used, one work around is to use `shlex` to safely create the string.

#  **Using the `shlex` module**

*   Purpose of `shlex` is to parse `Unix` strings safely. 
*   Ensure that malicious code cannot be run





In [None]:
import shlex

shlex.split("/content && rm -rf /all/my/dirs")

['/content', '&&', 'rm', '-rf', '/all/my/dirs']

In [None]:
from subprocess import run

directory = shlex.split("/content")
cmd = ["ls"]
cmd.extend(directory)
run(cmd, shell=True)

CompletedProcess(args=['ls', '/content'], returncode=0)

### **Safely find directories**

Write a tool that safely processes user input and searches a file system for all directories using `find` and `subprocess.Popen`.

*   Write a script that safely can test user input in `subprocess.Popen`.
*  Find all of the files in the current working directory.
*  Print out all of the directories found.

In [None]:
from subprocess import PIPE, Popen

#Accepts user input
print("Enter a path to search for directories: \n")
user_input = "/content"
print(f"directory to process: {user_input}")

#Pass safe user input into subprocess
with Popen(["find", user_input, "-type", "d"], stdout=PIPE) as find:
    result = find.stdout.readlines()
    
    #Process each line and decode it and strip it
    for line in result:
        formatted_line = line.decode("utf-8").strip()
        print(f"Found Directory: {formatted_line}")


Enter a path to search for directories: 

directory to process: /content
Found Directory: /content
Found Directory: /content/.config
Found Directory: /content/.config/logs
Found Directory: /content/.config/logs/2021.06.01
Found Directory: /content/.config/configurations
Found Directory: /content/sample_data


#  Directory Summarizer

Write a script that will calculate the total disk usage from an arbitrary amount of directories you pass in, or you would just use the Unix `du` command alone. 

*  Use `shlex.split` to safely split a list of directories
*  Pass the output of the `shlex.split` to subprocess.run.
*  Print out the results.

In [None]:
from subprocess import run, PIPE
import shlex

print("Enter a list of directories to calculate storage total: \n")
user_input = "/sample_data"

# Sanitize the user input
sanitized_user_input = shlex.split(user_input)
print(f"raw_user_input: {user_input} |  sanitized_user_input: {sanitized_user_input}")

# Safely Extend the command with sanitized input
cmd = ["du", "-sh", "--total"]
cmd.extend(["/content" + dirname for dirname in sanitized_user_input])
print(f"cmd: {cmd}")

# Print the totals out
disk_total = run(cmd, stdout=PIPE)
print(disk_total.stdout.decode("utf-8"))


Enter a list of directories to calculate storage total: 

raw_user_input: /sample_data |  sanitized_user_input: ['/sample_data']
cmd: ['du', '-sh', '--total', '/content/sample_data']
55M	/content/sample_data
55M	total



The key difference in `shlex.split` is that it can safely quote unix strings and prevent attack vectors versus a regular string `split` method that doesn't have this capability.

# **`os.walk`**

* `os.walk` returns the 
  *  root 
  *  directories
  *  files
*  The type of the object is a Python `generator`.



In [None]:
import os
f = os.walk("/content")
next(f)

('/content', ['.config', 'sample_data'], ['healthcheck.txt', 'healthcheck.sh'])

In [None]:
import os
fullpath = "/content/sample_data/mnist_test.csv"
_, ext = os.path.splitext(fullpath)
print(_)
print(ext)

/content/sample_data/mnist_test
.csv


### **Find the matching files**

The code below will search for files that match specific patterns in a directory `sample_data`.

*  Walk the file system starting at the `/content`.
*  Create the full path to the file by using `os.path.join()`.
*  Match the extension pattern `.csv` using `os.path.splitext() `method and append matches to a `list`.
*  Print the matches you find.

The code below could be made even more sophisticated by taking advantage of other Python features like looking for regular expressions in file content or performing machine learning on each file.

In [None]:
import os

# Walk the filesystem starting at the /content
matches = []
for root, _, files in os.walk('/content'):
    for name in files:
      	# Create the full path to the file by using os.path.join()
        fullpath = os.path.join(root, name)
        print(f"Processing file: {fullpath}")
        # Split off the extension and discard the rest of the path
        _, ext = os.path.splitext(fullpath)
        # Match the extension pattern .csv
        if ext == ".csv":
            matches.append(fullpath)
            
# Print the matches you find          
print("\n", matches)


Processing file: /content/healthcheck.txt
Processing file: /content/healthcheck.sh
Processing file: /content/.config/.last_survey_prompt.yaml
Processing file: /content/.config/active_config
Processing file: /content/.config/config_sentinel
Processing file: /content/.config/.last_opt_in_prompt.yaml
Processing file: /content/.config/.last_update_check.json
Processing file: /content/.config/gce
Processing file: /content/.config/logs/2021.06.01/13.39.46.598129.log
Processing file: /content/.config/logs/2021.06.01/13.40.26.776675.log
Processing file: /content/.config/logs/2021.06.01/13.40.44.102103.log
Processing file: /content/.config/logs/2021.06.01/13.40.43.432793.log
Processing file: /content/.config/logs/2021.06.01/13.40.21.024150.log
Processing file: /content/.config/logs/2021.06.01/13.40.05.370650.log
Processing file: /content/.config/configurations/config_default
Processing file: /content/sample_data/README.md
Processing file: /content/sample_data/anscombe.json
Processing file: /con

### **Rename files**

Rename all of the files in the directory by replacing the phrase `'f_'` with `'file_'`.  Remember that the name variable will need to be split to be renamed.

*  Use `os.walk` to traverse the cattle directory.
*  Use `pathlib` to rename all files with` 'f_'` to `'file_'`.

In [None]:
import pathlib
import os

# Walk the filesystem starting at the test_dir
for root, _, files in os.walk('/content'):
    for name in files:
      	
        # Create the full path to the file by using os.path.join()
        fullpath = os.path.join(root, name)
        print(f"Processing file: {fullpath}")
        
        # Rename file ตรงนี้เราเปลี่ยนนิดหน่อย
        if "f_" in name:
            p = pathlib.Path(fullpath)
            old_name = name.split("_")[1] # You need to split the name by underscore
            new_name = f"file_{old_name}"
            print(f"Renaming file {name} to {new_name}")
            p.rename(new_name)

            # ในโจทย์เป็นแบบนี้
            #  shortname = name.split("_")[0] # You need to split the name by underscore
            #  new_name = f"{shortname}_longhorn"

Processing file: /content/healthcheck.txt
Processing file: /content/healthcheck.sh
Processing file: /content/.config/.last_survey_prompt.yaml
Processing file: /content/.config/active_config
Processing file: /content/.config/config_sentinel
Processing file: /content/.config/.last_opt_in_prompt.yaml
Processing file: /content/.config/.last_update_check.json
Processing file: /content/.config/gce
Processing file: /content/.config/logs/2021.06.01/13.39.46.598129.log
Processing file: /content/.config/logs/2021.06.01/13.40.26.776675.log
Processing file: /content/.config/logs/2021.06.01/13.40.44.102103.log
Processing file: /content/.config/logs/2021.06.01/13.40.43.432793.log
Processing file: /content/.config/logs/2021.06.01/13.40.21.024150.log
Processing file: /content/.config/logs/2021.06.01/13.40.05.370650.log
Processing file: /content/.config/configurations/config_default
Processing file: /content/sample_data/README.md
Processing file: /content/sample_data/anscombe.json
Processing file: /con

Use the `os.walk` module to find serialized models and test them for accuracy.

*   Walk the the file system path `my` using `os.walk`.
*   Look for a file extension named `.joblib` and load the model into `clf` using joblib's `load()` function.
*   Use `sklearn` to predict from the unpickled model by loading it into `clf.predict()` and pass the input data `X_digits` to it (X_digits is already in memory).
*  Print your predictions.

In [None]:
%%file pca.py

from sklearn import linear_model, decomposition, datasets
from sklearn.pipeline import Pipeline
from sklearn.externals import joblib
import numpy as np

logistic = linear_model.LogisticRegression()
pca = decomposition.PCA()
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])
digits = datasets.load_digits()
X_digits = digits.data
y_digits = digits.target
pca.fit(X_digits)

## Pickle out Model
joblib.dump(pca, '/content/digits_prediction.joblib')


Writing pca.py


The code below cannot be run due to the `joblib` file cannot be copied.

In [None]:
import os
from pca import X_digits # โจทยเป้น my.models.pca ทั้งนี้ขึ้นอยู่กับการเขียน directories
from sklearn.externals import joblib

# Walk the filesystem starting at the /content path
for root, _, files in os.walk('/content'):
    for name in files:
      	# Create the full path to the file by using os.path.join()
        fullpath = os.path.join(root, name)
        print(f"Processing file: {fullpath}")
        _, ext = os.path.splitext(fullpath)
        # Match the extension pattern .joblib
        if ext == ".joblib":
            clf = joblib.load(fullpath)
            break

# Predict from pickled model
print(clf.transform(X_digits))


#  **`Pathlib.Path.glob()`**
`Pathlib.Path.glob()` does 

  *  find patterns in directories 
  *  yield matches
  *  recursively search directories

## Simple **`Pathlib.path.glob()`** pattern

In [None]:
from pathlib import Path

path = Path("/content/sample_data/")
list(path.glob("*.csv"))

[PosixPath('/content/sample_data/california_housing_test.csv'),
 PosixPath('/content/sample_data/mnist_train_small.csv'),
 PosixPath('/content/sample_data/california_housing_train.csv'),
 PosixPath('/content/sample_data/mnist_test.csv')]

## **Recursive `Pathlib.path.glob()` patterns**

In [None]:
from pathlib import Path

path = Path("/content")
list(path.glob("**/*.csv"))

[PosixPath('/content/sample_data/california_housing_test.csv'),
 PosixPath('/content/sample_data/mnist_train_small.csv'),
 PosixPath('/content/sample_data/california_housing_train.csv'),
 PosixPath('/content/sample_data/mnist_test.csv')]

#  **Using `os.walk` to find patterns**
*   returns directory path
*   directory names
*   filenames

In [None]:
import os
result = os.walk("/content")
# consume the generator
print(next(result))
print(next(result))
print(next(result))
print(next(result))
print(next(result))
print(next(result))


('/content', ['.config', 'sample_data'], [])
('/content/.config', ['logs', 'configurations'], ['.last_survey_prompt.yaml', 'active_config', 'config_sentinel', '.last_opt_in_prompt.yaml', '.last_update_check.json', 'gce'])
('/content/.config/logs', ['2021.06.01'], [])
('/content/.config/logs/2021.06.01', [], ['13.39.46.598129.log', '13.40.26.776675.log', '13.40.44.102103.log', '13.40.43.432793.log', '13.40.21.024150.log', '13.40.05.370650.log'])
('/content/.config/configurations', [], ['config_default'])
('/content/sample_data', [], ['README.md', 'anscombe.json', 'california_housing_test.csv', 'mnist_train_small.csv', 'california_housing_train.csv', 'mnist_test.csv'])


#  **Using `fnmatch.fnmatch()`**
*  Tests if a pattern is True or False
*  Can be converted to regular expression 

In [None]:
from fnmatch import fnmatch

if fnmatch("/content/sample_data/", "*.csv"):
    log.info(f"Found match {file}")

#  **Converting fnmatch to regular expression**
*   The `fnmatch.translate()` function has the ability to take a unix wildcard and translate it into a regular expression.
*   

In [None]:
import fnmatch, re

regex = fnmatch.translate('*.csv')
pattern = re.compile(regex)
print(pattern)

re.compile('(?s:.*\\.csv)\\Z')


In [None]:
pattern.match("/content/sample_data/mnist_test.csv")

<re.Match object; span=(0, 35), match='/content/sample_data/mnist_test.csv'>

The code below does the followings:
*  Use `pathlib.Path` to make a `Path` object for the `/content/sample_data` directory.
*  Use `.glob` on the Path object to filter the `.csv` pattern.
*  Print out all matches to the `.glob` pattern.

In [None]:
from pathlib import Path
import os

path = Path("/content/sample_data")
matches = sorted(path.glob("*.csv"))
for match in matches:
    print(f"Found .csv file in sample_data: {match}")

Found .csv file in sample_data: /content/sample_data/california_housing_test.csv
Found .csv file in sample_data: /content/sample_data/california_housing_train.csv
Found .csv file in sample_data: /content/sample_data/mnist_test.csv
Found .csv file in sample_data: /content/sample_data/mnist_train_small.csv


The code below uses the `fnmatch.filter` function to filter for `csv` files from a list of files.

In [None]:
import fnmatch

# List of file names to process
files = ["data1.csv", "script.py", "image.png", "data2.csv", "all.py"]

# Function that returns 
def csv_matches(list_of_files):
    """Return matches for csv files"""

    matches = fnmatch.filter(list_of_files, "*.csv")
    return matches

# Call function to find matches
matches = csv_matches(files)
print(f"Found matches: {matches}")


Found matches: ['data1.csv', 'data2.csv']


#   **Using `shutil.copytree`**

*  Can copy an entire tree of files and directories recursively. 
*  Can ignore patterns 

```
from shutil import copytree, ignore_patterns

copytree(source, destination, ignore=ignore_patterns('*.txt',
'*.excel'))
```





In [None]:
from shutil import copytree

!pwd
# A directory and a file are created. 
!mkdir sometree && touch sometree/somefile.txt

# The copy tree function is used to copy the entire tree to a destination called newtree.
copytree("sometree", "newtree")

!ls -l newtree/

/content
total 0
-rw-r--r-- 1 root root 0 Jun 14 07:54 somefile.txt


# **Safely deleting a tree of data by `shutil.rmtree`**

```
from shutil import rmtree

rmtree(source, destination)
```
# **Safely archiving a tree by `shutil.make_archive`**

```
from shutil import make_archive

make_archive("somearchive", "gztar", "inside_tmp_dir")
```
*  `gzip` is used to compress files
*  `tar` bundles multiple items into a package 
*   When both are used together it creates a compressed archive.

```
'/tmp/somearchive.tar.gz'
```





### **Create temp file.**

* Use `tempfile.NamedTemporaryFile` to create a named temporary file.
* Write a message to it and verify it was written.
* Verify temporary file was destroyed using `os.path.exists`.

In [None]:
import tempfile
import os

# Create a self-destructing temporary file
with tempfile.NamedTemporaryFile() as exploding_file:
  	# This file will be deleted automatically after the with statement block
    print(f"Temp file created: {exploding_file.name}")
    exploding_file.write(b"This message will self-destruct in 5....4...\n")
    
    # Get to the top of the file
    exploding_file.seek(0)

    #Print the message
    print(exploding_file.read())

# Check to sure file self-destructed
if not os.path.exists(exploding_file.name): 
    print(f"self-destruction verified: {exploding_file.name}")


Temp file created: /tmp/tmpxejj1kzw
b'This message will self-destruct in 5....4...\n'
self-destruction verified: /tmp/tmpxejj1kzw


### **Archive users**

Write an automation script that will archive all user folders and email them the archived copy. 

Use the `shutil.archive` function to archive a user directory. You will create two archive types: `gztar` and `zip`.

*  `apath` is the path where the tree is archived and string arguments "`zip`" and "`gztar`" can create the two archives.
*  Archive the user folder copied to the location `/content`.
*  Create a `tar` and `gzipped` archive and a `zip` archive.
*  Print both archive files out using `os.listdir()`.

In [None]:
!mkdir /content/archive

In [None]:
from shutil import make_archive
import os

# Archive root
username = "user1"
root_dir = "/content"
apath = "/content/archive"

# Archive base
final_archive_base = f"{apath}/{username}"

# Create tar and gzipped archive
make_archive(final_archive_base, "gztar", apath)

# Create zip archive
make_archive(final_archive_base, "zip", apath)

# Print out archives
print(os.listdir(apath))

['user1.tar.gz', 'user1.zip']


In [None]:
final_archive_base

'/content/archive/user1'

# **`pathlib.Path`**
*  `pathlib.Path.glob()` method is used to list all of the items at the current level of the directory.

In [None]:
from pathlib import Path

path = Path("/content")
list(path.glob("*"))

[PosixPath('/content/.config'),
 PosixPath('/content/sometree'),
 PosixPath('/content/newtree'),
 PosixPath('/content/archive'),
 PosixPath('/content/sample_data')]

In [None]:
from pathlib import Path

path = Path("/content")
# show what is the current working directory
print(path.cwd())

# show whether the object it represents actually exists.
print(path.exists())

# return the object as a full path
print(path.as_posix())

/content
True
/content


# **Open a file with `pathlib`**

In [None]:
from pathlib import Path
some_file = Path("/content/sample_data/README.md")

# Print the last line
with some_file.open() as file_to_read:
    print(file_to_read.readlines()[-1:])

['    [vega_datasets library](https://github.com/altair-viz/vega_datasets/blob/4f67bdaad10f45e3549984e17e1b3088c731503d/vega_datasets/_data/anscombe.json).\n']


#  **Create a directory with `pathlib`**

* Once the `Path` object has been created, the `mkdir` method can be used to a create a directory.

In [None]:
from pathlib import Path

tmp = Path("/content/temporary_folder")
tmp.mkdir()
!ls -l

total 20
drwxr-xr-x 2 root root 4096 Jun 14 08:31 archive
drwxr-xr-x 2 root root 4096 Jun 14 07:54 newtree
drwxr-xr-x 1 root root 4096 Jun  1 13:40 sample_data
drwxr-xr-x 2 root root 4096 Jun 14 07:54 sometree
drwxr-xr-x 2 root root 4096 Jun 14 09:48 temporary_folder


# **Write text with `pathlib`**

* Can create a file that does not yet exist and write something.

In [None]:
from pathlib import Path

write_path = Path("/content/some_random_file.txt")
write_path.write_text("Wow")
print(write_path.read_text())

Wow


# **Rename a file with `pathlib`**

*   Renaming files in a script
*   Walk a file system and rename files that match a specific pattern
*   the `rename` function is used to rename the file

In [None]:
from pathlib import Path

# Create a Path object
modify_file = Path("/content/some_random_file.txt")

#rename file
modify_file.rename("/content/some_random_file_renamed.txt")

!ls /content

archive  sample_data		       sometree
newtree  some_random_file_renamed.txt  temporary_folder


### **Find whether files exist**

* Read in a `posts_index.txt` file to find paths.
* Create `pathlib.Path` objects and use `.exists()` to check if the path is valid.
* Use `post.strip()` to clean up the output of path you pass to `pathlib.Post`.

In [None]:
!mkdir /content/socialposts
!!mkdir /content/socialposts/2019
!mkdir /content/socialposts/2019/june

In [None]:
# Create files for this exercise

from pathlib import Path

Path("/content/socialposts/2019/june/post1.txt").write_text("")
Path("/content/socialposts/2019/june/post3.txt").write_text("")
Path("/content/socialposts/2019/june/post4.txt").write_text("")
Path("/content/socialposts/2019/june/post9.txt").write_text("")

0

In [None]:
# Create posts_index.txt

with open('posts_index.txt', 'w') as f:
    for i in range(1, 11):
        f.write(f"/content/socialposts/2019/june/post{i}.txt\n")  


In [None]:
import pathlib

# Read the index of social media posts
with open("/content/posts_index.txt") as posts:
    for post in posts.readlines():
        
        # Create a pathlib object
        path = pathlib.Path(post.strip())
        
        # Check if the social media post still exists on disk
        if path.exists():
            print(f"Found active post: {post}")
        else:
            print(f"Post is missing: {post}")


Found active post: /content/socialposts/2019/june/post1.txt

Post is missing: /content/socialposts/2019/june/post2.txt

Found active post: /content/socialposts/2019/june/post3.txt

Found active post: /content/socialposts/2019/june/post4.txt

Post is missing: /content/socialposts/2019/june/post5.txt

Post is missing: /content/socialposts/2019/june/post6.txt

Post is missing: /content/socialposts/2019/june/post7.txt

Post is missing: /content/socialposts/2019/june/post8.txt

Found active post: /content/socialposts/2019/june/post9.txt

Post is missing: /content/socialposts/2019/june/post10.txt



Create an integration script that creates several Python files and writes Python to them. After that run them all with `python3` and `subprocess` to obtain the scripts' output.

*  Use `pathlib.write_text` to write to create and write files.
*  Use `Path` and `.glob` to find these files.
*  Run all the matching `python` scripts using `subprocess.run` method.

In [None]:
from subprocess import run, PIPE
from pathlib import Path

# Find all the python files you created and print them out
for i in range(3):
    path = Path(f"/content/file_{i}.py")
    path.write_text("#!/usr/bin/env python\n")
    path.write_text("import datetime;print(datetime.datetime.now())")
  

# Find all the python files you created and print them out
for file in Path("/content").glob("*.py"):
   # Gets the resolved full path
   fullpath = str(file.resolve())
   proc = run(["python3", fullpath], stdout=PIPE)
   print(proc)


CompletedProcess(args=['python3', '/content/file_0.py'], returncode=0, stdout=b'2021-06-14 10:27:58.216740\n')
CompletedProcess(args=['python3', '/content/file_2.py'], returncode=0, stdout=b'2021-06-14 10:27:58.274268\n')
CompletedProcess(args=['python3', '/content/file_1.py'], returncode=0, stdout=b'2021-06-14 10:27:58.323932\n')


# **Decorators**

*  Below is an example of how to write a decorator that 
  *  prints out the `time` a function it wraps will take to execute. 
  *  prints out the `name` of the function and the `arguments` and `keyword arguments` that are passed in.

*  This is accomplished by importing the `wraps` module from `functools`. (ไม่งั้น จะ print พวก metadata ของฟังก์ชันที่เอาไปครอบไม่ได้)


In [None]:
from functools import wraps
import time

def instrument(f):
    @wraps(f)
    def wrap(*args, **kwargs):
        ts = time.time()
        result = f(*args, **kwargs)
        te = time.time()
        print(args)
        print(kwargs) # ไม่เข้าใจ ทำไมไม่ออก
        print(f"function: {f.__name__}, args: [{args}, {kwargs}] took: {te-ts} sec")
        return result
    return wrap

@instrument
def lazy_work(x,y, sleep=1):
    """Sleeps then works"""
    time.sleep(sleep)
    return x+y

lazy_work(4,9, sleep=2)
lazy_work.__doc__

(4, 9)
{'sleep': 2}
function: lazy_work, args: [(4, 9), {'sleep': 2}] took: 2.002094030380249 sec


'Sleeps then works'

# **How does a decorator work?**

In [None]:
from functools import wraps

def do_nothing_decorator(f):
    @wraps(f)
    def wrapper(*args, **kwds):
        print('INSIDE DECORATOR: This is called before function')
        return f(*args, **kwds)
    return wrapper

@do_nothing_decorator
def hello_world():
    """This is a hello world function"""
    print("Hello World Function")

hello_world()


INSIDE DECORATOR: This is called before function
Hello World Function


In [None]:
# Name and docstring is preserved
print(f"Function Name: {hello_world.__name__}")
print(f"Function Name: {hello_world.__doc__}")                

Function Name: hello_world
Function Name: This is a hello world function


Write an integration test that verifies that your cloud environment can run `KMeans` clustering algorithms.

Write two functions that make your code ready to be run by a command line tool library.

*  Create a function that makes blobs using the `sklearn.datasets.samples_generator.make_blobs` library. These blobs are sample data that will be used to create clusters. It accepts `n_samples`, `centers`, `random_state`.
*  Create a function that performs `KMeans` clustering. 

In [None]:
from sklearn.datasets.samples_generator import make_blobs
from sklearn.cluster import KMeans

# Create sample blobs from sklearn datasets
def blobs():
    X, y = make_blobs(n_samples=10, centers=3, n_features=2,random_state=0)
    return X,y
  
# Perform KMeans cluster
def cluster(X, random_state=170, num=2):
    return KMeans(n_clusters=num, random_state=random_state).fit_predict(X) # Returns cluster assignment

# Run everything:  Call both functions. X creates the data and cluster clusters the data.
def main():
    X,_ = blobs()
    return cluster(X)

# Print the KMeans cluster assignments
print(main()) 




[1 1 0 1 1 0 1 0 0 1]


Create an integration test that loops over decorators you create and prints out the names of the decorated functions. Use this approach to verify two decorated function names.

* Decorate two functions with the decorator `@nothing` which has been imported for you.
* Put both uncalled functions into a list.
* Print the name of each function by using a `for` loop to pull them out of a list.

In [None]:
from functools import wraps

def nothing(f):
    @wraps(f)
    def wrapper(*args, **kwds):
        return f(*args, **kwds)
    return wrapper

# Decorate first function
@nothing
def something():
    pass

# Decorate second function
@nothing
def another():
    pass

# Put uncalled function into a list and print name  
funcs = [something, another]
for func in funcs:
    print(f"function name: {func.__name__}")

function name: something
function name: another


# **Debugging decorator**

Write a debugging decorator that interns can use that will print out both the arguments and the keyword arguments when they are applied to a function. 

* Create a decorator that prints the `*args` and `**kw` arguments passed into it.
* Apply that decorator to the `mult` function and run it.

In [None]:
from functools import wraps

# Create decorator
def debug(f):
	@wraps(f)
	def wrap(*args, **kw):
		result = f(*args, **kw)
		print(f"function name: {f.__name__}, args: [{args}], kwargs: [{kw}]")
		return result
	return wrap
  
# Apply decorator
@debug
def mult(x, y=10):
	return x*y
print(mult(5, y=5))


function name: mult, args: [(5,)], kwargs: [{'y': 5}]
25


#  **Use `sys.argv`**

The `sys.argv` module captures input to a script as a list.

In [None]:
import sys
print(sys.argv)

['/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py', '-f', '/root/.local/share/jupyter/runtime/kernel-ff012712-9cf5-4ed4-bf06-71fe19ec66cb.json']


#  **Writing a script with `sys.argv`**

In [None]:
%%file hello_argv.py
import sys
def hello(user_input):
    print(f"From a user: {user_input}")

if __name__ == "__main__":
    arg1 = sys.argv[1]
    print(f"sys.argv = {sys.argv}")
    hello(arg1)

Overwriting hello_argv.py


#  **Parsing input from script**

To call the script that was created from the command line:

1.   `python interpretor` is first invoked.  (Shell command `python`) 
2.    Next the name of the script 
3.   the input to be passed to the script



In [None]:
!python hello_argv.py something

sys.argv = ['hello_argv.py', 'something']
From a user: something


In [None]:
!python hello_argv.py another

sys.argv = ['hello_argv.py', 'another']
From a user: another


### **Using python command-line tools**

There is a command-line executable written in python called `findit.py`.  It takes two arguments using `sys.argv` to process them.

*  The first argument is the path to search (i.e. `/some/path`).
*  The second is an optional argument that finds a file extension (i.e. `.pdf`). 

Use this script to search for files with the extension `.csv` in the `/content/sample_data`.

The `findit.py` is created below.


In [5]:
%%file findit.py
import os

# Searches files and finds matches
def find_files(path, extension=".csv"):
    matches = []
    for root, _, files in os.walk(path):
        for name in files:
            fullpath = os.path.join(root,name)
            _ , ext = os.path.splitext(fullpath)
            if ext == extension:
                matches.append(fullpath)
    return matches

# block runs if called as a script
if __name__ == "__main__":
    import sys
    args = sys.argv
    print(f"Arguments to sys.arg: {args}")
    # block to parse arguments
    if not len(args) > 1:
        print(f"Pass in path and/or extension:  findit.py /some/path .csv")
        sys.exit(1)
    if len(args) == 3:
        result = find_files(path=args[1], extension=args[2])
    elif len(args) == 2:
        result = find_files(path=args[1])
    else:
        print(f"Wrong number of arguments[{len(args)}] takes max of two")
        sys.exit(1)
    print(f"Found matches: {result}")

Overwriting findit.py


In [6]:
!python findit.py sample_data .csv

Arguments to sys.arg: ['findit.py', 'sample_data', '.csv']
Found matches: ['sample_data/california_housing_train.csv', 'sample_data/mnist_test.csv', 'sample_data/california_housing_test.csv', 'sample_data/mnist_train_small.csv']


`reverseit.py` reverses all lines in a file and prints them out one by one.

* Write two lines to a file `input.txt`.
* Use the `subprocess` module to run the `/content/reverseit.py` script with `input.txt` as an argument.
*  Print out the results.

The `reverseit.py` is created below.

In [7]:
%%file reverseit.py
import fileinput

input_sent = fileinput.input()
for line in input_sent:
    reversed_line = line[::-1]
    print(reversed_line)

Overwriting reverseit.py


In [8]:
from subprocess import Popen, PIPE

# Write a file
with open("input.txt", "w") as input_file:
    input_file.write("Reverse this string\n")
    input_file.write("Reverse this too!")

# Runs python script that reverse strings in a file line by line
run_script = Popen(["python", "reverseit.py", "input.txt"], stdout=PIPE)
                          
# Print out the script output
for line in run_script.stdout.readlines():
    print(line.decode("utf-8"))




gnirts siht esreveR

!oot siht esreveR



# **What is click?**

Create beautiful command line interfaces
* The `@click.command` decorator is a convention that allows click to turn a function into a command. 
* The `@click.option` decorator creates a command line option, phrase, that will prompt the user for input.
* `click.echo` command emits strings to `stdout` and in this example it returns the tokenized phrase.

In [9]:
%%file hello_click.py
import click

@click.command()
@click.option('--phrase', prompt='Enter a phrase',help='test')
def tokenize(phrase):
    """tokenize phrase"""
    click.echo(f"tokenized phrase: {phrase.split()}")

if __name__ == '__main__':
    tokenize()

Overwriting hello_click.py


In [14]:
!python hello_click.py --phrase "test t  s "
!python hello_click.py

tokenized phrase: ['test', 't', 's']
Enter a phrase: without phrase flag
tokenized phrase: ['without', 'phrase', 'flag']




```
import sys
sys.argv=['']
del sys
```
พอจะเข้าใจแล้วว่า ถ้ารันใน jupyter เลย ต้องใช้คำสั่งเหล่านี้ ทำให้ `sys.argv` ว่างก่อน แต่ถ้ารันใน Shell command จะไม่ต้องใส่ก็ได้

Write a code that

*  Use `random.choice` to select from cities in the `values` list.
*  Print the `result` to stdout using `click`'s `echo` function.


In [7]:
import click
import random
random.seed(42)

# Create random values to choose from
values = ["Nashville", "Austin", "Denver", "Cleveland"]

# Select a random choice
result = random.choice(values)

# Print the random choice using click echo
click.echo(f"My choice is: {result}")


My choice is: Nashville


Take `cluster.py` that performs `KMeans` clustering and execute it with two different options: `help` and `num`. Run this inside of `subprocess.run` and print both outputs to standard out.

* Run the `cluster.py` click application to retrieve a help message using the help flag.
* Run the `cluster.py` click application with `num` option of `2`.
* Assign the output of the first subprocess to `help_out` and print `stdout`.
Assign the output of the second subprocess to `cluster2` and print `stdout`.

The file `cluster.py` is created below.

In [15]:
%%file cluster.py
#!/usr/bin/env python3
import click
from sklearn.datasets.samples_generator import make_blobs
from sklearn.cluster import KMeans

def blobs():
    X, y = make_blobs(n_samples=10, centers=3, n_features=2,random_state=0)
    return X,y

def cluster(X, random_state=170, num=2):
    return KMeans(n_clusters=num, random_state=random_state).fit_predict(X)

def main(num):
    X,_ = blobs()
    return(cluster(X,num=num))

@click.command()
@click.option("--num", default=2, help="Number of clusters")
def run_cluster(num):
    result = main(num)
    click.echo(f'Cluster assignments: {result} for total clusters [{num}]')

    
if __name__ == '__main__':
    # pylint: disable=no-value-for-parameter
    run_cluster()


Writing cluster.py


In [20]:
from subprocess import run, PIPE

# Run help for click tool
help_out = run(["python", "./cluster.py", "--help"], stdout=PIPE)

# Run cluster
cluster2 = run(["python", "./cluster.py", "--num", "2"], stdout=PIPE)

# Print help
print(help_out.stdout.decode("utf-8"))

# Print cluster output
print(cluster2.stdout.decode("utf-8"))


Usage: cluster.py [OPTIONS]

Options:
  --num INTEGER  Number of clusters
  --help         Show this message and exit.

Cluster assignments: [1 1 0 1 1 0 1 0 0 1] for total clusters [2]



* `@click.group()` creates main click application
*  `@cli.command()` creates subcommands

In [21]:
%%file click_functions.py
import click

@click.group()
def cli():
    pass

@cli.command()
def one():
    click.echo('One-1')

@cli.command()
def two():
    click.echo('Two-2')

if __name__ == '__main__':
    cli()

Writing click_functions.py


In [24]:
!python click_functions.py
!python click_functions.py one
!python click_functions.py two

Usage: click_functions.py [OPTIONS] COMMAND
                          [ARGS]...

Options:
  --help  Show this message and exit.

Commands:
  one
  two
One-1
Two-2


In [27]:
import click
from click.testing import CliRunner

@click.command()
@click.argument('phrase')
def echo_phrase(phrase):
    click.echo('You said: %s' % phrase)

runner = CliRunner()
result = runner.invoke(echo_phrase, ['Have data will camp'])
print(runner)
print(result)
print(result.output)
assert result.output == 'You said: Have data will camp\n'

<click.testing.CliRunner object at 0x7f00ceb4eed0>
<Result okay>
You said: Have data will camp



*  Use `click.open_file()` to open a file for writing.
*  Use a loop to iterate over words.
*  Write the content in the `words` variable out to this file.
*  Read it back in to verify it was successful.

In [28]:
import click

# Setup
words = ["Asset", "Bubble", "10", "Year"]
filename = "words.txt"

# Write with click.open()
with click.open_file(filename, 'w') as f:

# Loop over words with a for loop
    for word in words:
        f.write(f'{word}\n')

# Read it back
with open(filename) as output_file:
    print(output_file.read())


Asset
Bubble
10
Year



### Invoking command line tests

*  Define the `click` command using the two decorators: `@click.command()` and `@click.option()`.
*  Use the `CliRunner()` from `click` to run a command line click application.
*  Pass in `--num` 2 via the `runner.invoke()` command.
*  Assert that the `result` returns exit status of `0`.

First, the file `setup.py` is created below.

In [29]:
%%file setup.py

import click
from click.testing import CliRunner

from sklearn.datasets.samples_generator import make_blobs
from sklearn.cluster import KMeans

def blobs():
    X, y = make_blobs(n_samples=10, centers=3, n_features=2,random_state=0)
    return X,y

def cluster(X, random_state=170, num=2):
    return KMeans(n_clusters=num, random_state=random_state).fit_predict(X)

def main(num):
    X,_ = blobs()
    return(cluster(X,num=num))


Writing setup.py


In [30]:
import click
from click.testing import CliRunner
from setup import main

# Define the click command
@click.command()
@click.option("--num", default=2, help="Number of clusters")
def run_cluster(num):
    result = main(num)
    click.echo(f'Cluster assignments: {result} for total clusters [{num}]')

# Create the click test runner
runner = CliRunner()

# Run the click app and assert it runs without error
result = runner.invoke(run_cluster, ['--num', '2'])
assert result.exit_code == 0
print(result.output)


Cluster assignments: [1 1 0 1 1 0 1 0 0 1] for total clusters [2]





Remember that click flags like `--something` need to be passed into functions as `something` to use them in the function.